You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/08/18 06:57:11 UTC

[ignite] branch master updated: IGNITE-17456 Fixed initialization of already compressed segments (#10183)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 026ac0d76bc IGNITE-17456 Fixed initialization of already compressed segments (#10183)
026ac0d76bc is described below

commit 026ac0d76bc16dd48851434c952500eb566a781b
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Aug 18 09:57:00 2022 +0300

    IGNITE-17456 Fixed initialization of already compressed segments (#10183)
---
 .../persistence/wal/FileWriteAheadLogManager.java  |  27 +-
 .../wal/aware/SegmentCompressStorage.java          |   7 +-
 .../IgniteCacheDatabaseSharedManagerSelfTest.java  |   8 +
 .../db/wal/WalCompactionAfterRestartTest.java      | 157 -----------
 .../db/wal/WalCompactionNotificationsTest.java     | 304 +++++++++++++++++++++
 .../wal/FileWriteAheadLogManagerSelfTest.java      |   8 +
 .../ignite/testsuites/IgnitePdsTestSuite5.java     |   3 +
 7 files changed, 347 insertions(+), 167 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 01aebe4c07f..b4cad823d75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2181,13 +2181,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * Checks if there are already compressed segments and assigns counters if needed.
          */
         private void initAlreadyCompressedSegments() {
-            FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+            long firstArchivedIdx = -1;
+            long lastCompactedIdx = -1;
 
-            if (alreadyCompressed.length > 0)
-                segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx());
+            for (FileDescriptor segment : walArchiveFiles()) {
+                if (segment.isCompressed()) {
+                    lastCompactedIdx = segment.idx();
 
-            for (FileDescriptor fd : alreadyCompressed)
-                metrics.onWalSegmentCompressed(fd.file().length());
+                    metrics.onWalSegmentCompressed(segment.file().length());
+                }
+                else if (firstArchivedIdx == -1)
+                    firstArchivedIdx = segment.idx();
+            }
+
+            // We have to set a starting index for the compressor.
+            if (lastCompactedIdx >= 0)
+                segmentAware.onSegmentCompressed(lastCompactedIdx);
+            else if (firstArchivedIdx >= 0)
+                segmentAware.onSegmentCompressed(firstArchivedIdx - 1);
         }
 
         /**
@@ -2266,6 +2277,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (reserved)
                 return segmentToCompress;
             else {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping segment compression [idx=" + segmentToCompress + ']');
+
                 segmentAware.onSegmentCompressed(segmentToCompress);
 
                 return -1;
@@ -2320,6 +2334,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                         segmentAware.onSegmentCompressed(segIdx);
 
+                        if (log.isDebugEnabled())
+                            log.debug("Segment compressed notification [idx=" + segIdx + ']');
+
                         if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode())
                             evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile()));
                     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index f71dd79963f..2272c0ec123 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -69,9 +69,6 @@ class SegmentCompressStorage {
      * @param compressedIdx Index of compressed segment.
      */
     synchronized void onSegmentCompressed(long compressedIdx) {
-        if (log.isInfoEnabled())
-            log.info("Segment compressed notification [idx=" + compressedIdx + ']');
-
         if (compressedIdx > lastMaxCompressedIdx)
             lastMaxCompressedIdx = compressedIdx;
 
@@ -139,8 +136,8 @@ class SegmentCompressStorage {
      */
     synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
         while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) {
-            if (log.isInfoEnabled())
-                log.info("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');
+            if (log.isDebugEnabled())
+                log.debug("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');
 
             segmentsToCompress.add(++lastEnqueuedToCompressIdx);
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java
index 321907a2478..95295b56351 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static java.lang.System.clearProperty;
 import static java.lang.System.setProperty;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
 import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_SEGMENT_SIZE;
@@ -38,6 +39,13 @@ import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
  * Class for testing {@link IgniteCacheDatabaseSharedManager}.
  */
 public class IgniteCacheDatabaseSharedManagerSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        clearProperty(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+    }
+
     /**
      * Checking the correctness of validation {@link DataStorageConfiguration#getMinWalArchiveSize()}.
      *
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
deleted file mode 100644
index 6e65e1a40b9..00000000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.ignite.internal.processors.cache.persistence.db.wal;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.events.WalSegmentCompactedEvent;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
-
-/** */
-@Ignore("https://issues.apache.org/jira/browse/IGNITE-13723")
-public class WalCompactionAfterRestartTest extends GridCommonAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(name);
-
-        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
-                .setPersistenceEnabled(true)
-                .setMaxSize(200L * 1024 * 1024))
-            .setWalMode(WALMode.LOG_ONLY)
-            .setWalSegmentSize(512 * 1024)
-            .setWalCompactionEnabled(true)
-            .setMaxWalArchiveSize(2 * 512 * 1024)
-        );
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setName(DEFAULT_CACHE_NAME);
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
-        ccfg.setBackups(0);
-
-        cfg.setCacheConfiguration(ccfg);
-        cfg.setConsistentId(name);
-
-        cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void test() throws Exception {
-        IgniteEx ig = startGrid(0);
-
-        ig.cluster().active(true);
-
-        doCachePuts(ig, 10_000);
-
-        ig.cluster().active(false);
-
-        stopGrid(0);
-
-        IgniteEx ig0 = startGrid(0);
-
-        ig0.cluster().active(true);
-
-        List<IgniteBiTuple<Long, Long>> discrepancies = Collections.synchronizedList(new ArrayList<>());
-
-        ig0.events().localListen(e -> {
-            long evtSegIdx = ((WalSegmentCompactedEvent)e).getAbsWalSegmentIdx();
-            long lastCompactedIdx = ig0.context().cache().context().wal().lastCompactedSegment();
-
-            if (lastCompactedIdx < 0 || lastCompactedIdx > evtSegIdx)
-                discrepancies.add(F.t(evtSegIdx, lastCompactedIdx));
-
-            return true;
-        }, EVT_WAL_SEGMENT_COMPACTED);
-
-        doCachePuts(ig0, 5_000);
-
-        stopGrid(0);
-
-        if (!discrepancies.isEmpty()) {
-            fail("Discrepancies (EVT_WAL_SEGMENT_COMPACTED index vs. lastCompactedSegment):" + System.lineSeparator() +
-                discrepancies.stream()
-                    .map(t -> String.format("%d <-> %d", t.get1(), t.get2()))
-                    .collect(Collectors.joining(System.lineSeparator())));
-        }
-    }
-
-    /** */
-    private void doCachePuts(IgniteEx ig, long millis) throws IgniteCheckedException {
-        IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
-
-        AtomicBoolean stop = new AtomicBoolean();
-
-        IgniteInternalFuture<Long> putFut = GridTestUtils.runMultiThreadedAsync(() -> {
-            ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-            while (!stop.get())
-                cache.put(rnd.nextInt(), "Ignite".getBytes());
-        }, 4, "cache-filler");
-
-        U.sleep(millis);
-
-        stop.set(true);
-
-        putFut.get();
-    }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNotificationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNotificationsTest.java
new file mode 100644
index 00000000000..3ec1198325f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNotificationsTest.java
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.logging.log4j.Level;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
+
+/**
+ * WAL compaction notifications test.
+ */
+@RunWith(Parameterized.class)
+public class WalCompactionNotificationsTest extends GridCommonAbstractTest {
+    /** WAL segment size. */
+    private static final int SEGMENT_SIZE = 512 * 1024;
+
+    /** Maximum WAL segments count. */
+    private static final int MAX_WAL_SEGMENTS = 200;
+
+    /** Listening logger. */
+    private final ListeningTestLogger logger = new ListeningTestLogger(log);
+
+    /** WAL compaction event listener. */
+    private final EventListener evtLsnr = new EventListener();
+
+    /** WAL archive size. */
+    @Parameterized.Parameter
+    public long archiveSize;
+
+    /** Test run parameters. */
+    @Parameterized.Parameters(name = "archiveSize={0}")
+    public static Collection<Object[]> parameters() {
+        return Arrays.asList(new Object[][] {
+            { DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE },
+            { SEGMENT_SIZE },
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        return super.getConfiguration(name)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(200 * U.MB))
+                .setWalSegmentSize(SEGMENT_SIZE)
+                .setMaxWalArchiveSize(archiveSize)
+                .setWalCompactionEnabled(true)
+            ).setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setAffinity(new RendezvousAffinityFunction(false, 16))
+            ).setGridLogger(logger)
+            .setConsistentId(name)
+            .setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED)
+            .setLocalEventListeners(Collections.singletonMap(evtLsnr, new int[] {EVT_WAL_SEGMENT_COMPACTED}));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+
+        resetLog4j(Level.DEBUG, false, FileWriteAheadLogManager.class.getPackage().getName());
+
+        logger.registerListener(evtLsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Throwable If failed.
+     */
+    @Test
+    public void testNotifications() throws Throwable {
+        checkNodeRestart(50);
+        checkNodeRestart(50);
+    }
+
+    /**
+     * @param segmentsCnt The number of WAL segments to generate.
+     * @throws Exception If failed.
+     */
+    private void checkNodeRestart(int segmentsCnt) throws Exception {
+        IgniteEx ig = startGrid(0);
+        ig.cluster().state(ClusterState.ACTIVE);
+
+        generateWal(ig, segmentsCnt);
+        ig.cluster().state(ClusterState.INACTIVE);
+
+        evtLsnr.validateEvents();
+
+        stopGrid(0);
+    }
+
+    /**
+     * @param ig Ignite instance.
+     * @param segmentsCnt The number of WAL segments to generate.
+     */
+    private void generateWal(IgniteEx ig, int segmentsCnt) {
+        if (segmentsCnt <= 0)
+            return;
+
+        IgniteCache<Integer, String> cache = ig.cache(DEFAULT_CACHE_NAME);
+        IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+        long totalIdx = walMgr.currentSegment() + segmentsCnt;
+
+        while (evtLsnr.errRef.get() == null && !Thread.currentThread().isInterrupted() && walMgr.currentSegment() < totalIdx)
+            cache.put(ThreadLocalRandom.current().nextInt(), "Ignite");
+    }
+
+    /** Log event type. */
+    enum LogEventType {
+        /** */
+        ENQUEUE("Enqueuing segment for compression"),
+
+        /** */
+        SKIP("Skipping segment compression"),
+
+        /** */
+        COMPRESS("Segment compressed notification");
+
+        /** */
+        private final Pattern ptrn;
+
+        /** */
+        LogEventType(String msg) {
+            ptrn = Pattern.compile(msg + " \\[idx=(?<idx>-?\\d{1,10})]");
+        }
+    }
+
+    /** WAL compaction event listener. */
+    private class EventListener implements IgnitePredicate<Event>, Consumer<String> {
+        /** Error. */
+        private final AtomicReference<Throwable> errRef = new AtomicReference<>();
+
+        /** Events history. */
+        private final AtomicIntegerArray evtHistory = new AtomicIntegerArray(MAX_WAL_SEGMENTS);
+
+        /** Log notifications history. */
+        private final EnumMap<LogEventType, AtomicIntegerArray> logEvtHist = new EnumMap<>(LogEventType.class);
+
+        /** Last compacted segment index. */
+        private volatile long lastCompactedSegment;
+
+        /** Constructor. */
+        public EventListener() {
+            for (LogEventType type : LogEventType.values())
+                logEvtHist.put(type, new AtomicIntegerArray(MAX_WAL_SEGMENTS));
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("ErrorNotRethrown")
+        @Override public boolean apply(Event evt) {
+            try {
+                if (!(evt instanceof WalSegmentCompactedEvent))
+                    fail("Unexpected event type: " + evt.getClass().getName());
+
+                WalSegmentCompactedEvent compactEvt = (WalSegmentCompactedEvent)evt;
+                IgniteWriteAheadLogManager walMgr = grid(0).context().cache().context().wal();
+
+                long lastCompactedSegment = walMgr.lastCompactedSegment();
+                assertTrue("Negative index: " + lastCompactedSegment, lastCompactedSegment >= 0);
+
+                if (this.lastCompactedSegment > lastCompactedSegment) {
+                    fail("Unordered last compact segment value " +
+                        "[prev=" + this.lastCompactedSegment + ", curr=" + walMgr.lastCompactedSegment() + ']');
+                }
+
+                this.lastCompactedSegment = lastCompactedSegment;
+
+                int walIdx = (int)compactEvt.getAbsWalSegmentIdx();
+
+                assertEquals("Duplicate event [idx=" + walIdx + ']', 0, evtHistory.get(walIdx));
+
+                evtHistory.set(walIdx, 1);
+
+                return true;
+            }
+            catch (AssertionError | RuntimeException e) {
+                errRef.compareAndSet(null, e);
+
+                return false;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("ErrorNotRethrown")
+        @Override public void accept(String str) {
+            try {
+                for (LogEventType type : LogEventType.values()) {
+                    Matcher matcher = type.ptrn.matcher(str);
+
+                    if (!matcher.find())
+                        continue;
+
+                    int idx = Integer.parseInt(matcher.group("idx"));
+
+                    assertTrue("Negative index value in log [idx=" + idx + ", msg=" + str + ']', idx >= 0);
+
+                    AtomicIntegerArray hist = logEvtHist.get(type);
+
+                    if (type == LogEventType.ENQUEUE && logEvtHist.get(LogEventType.SKIP).get(idx) == 1) {
+                        logEvtHist.get(LogEventType.SKIP).set(idx, 0);
+
+                        break;
+                    }
+
+                    assertEquals("Duplicate index in log [idx=" + idx + ", msg=" + str + ']', 0, hist.get(idx));
+
+                    hist.set(idx, 1);
+
+                    break;
+                }
+            }
+            catch (AssertionError | RuntimeException e) {
+                errRef.compareAndSet(null, e);
+            }
+        }
+
+        /**
+         * @throws AssertionError If failed.
+         */
+        public void validateEvents() {
+            Throwable err = errRef.get();
+
+            if (err instanceof AssertionError)
+                throw (AssertionError)err;
+
+            if (err instanceof RuntimeException)
+                throw (RuntimeException)err;
+
+            if (err != null)
+                fail("Unexpected exception [class=" + err.getClass().getName() + ", msg=" + err.getMessage() + "].");
+
+            int lastCompactedIdx = (int)grid(0).context().cache().context().wal().lastCompactedSegment();
+
+            AtomicIntegerArray enqHist = logEvtHist.get(LogEventType.ENQUEUE);
+            AtomicIntegerArray cmprsHist = logEvtHist.get(LogEventType.COMPRESS);
+            AtomicIntegerArray skipHist = logEvtHist.get(LogEventType.SKIP);
+
+            for (int i = 0; i < lastCompactedIdx; i++) {
+                assertTrue("Missing event " +
+                    "[idx=" + i +
+                    ", evt=" + evtHistory.get(i) +
+                    ", cmprs=" + cmprsHist.get(i) + ']', evtHistory.get(i) == cmprsHist.get(i));
+                assertTrue("Log compression start missing [idx=" + i + ']', enqHist.get(i) == 1);
+                assertTrue("Log compression end missing [idx=" + i + ']', cmprsHist.get(i) == 1 || skipHist.get(i) == 1);
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java
index d226154dddc..2ebb3524a92 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static java.lang.System.clearProperty;
 import static java.lang.System.setProperty;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
 import static org.apache.ignite.configuration.DataStorageConfiguration.HALF_MAX_WAL_ARCHIVE_SIZE;
@@ -52,6 +53,13 @@ public class FileWriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
         assertEquals(50, minWalArchiveSize(cfg.setMinWalArchiveSize(HALF_MAX_WAL_ARCHIVE_SIZE)));
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        clearProperty(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+    }
+
     /**
      * Testing of {@link FileWriteAheadLogManager#minWalArchiveSize(DataStorageConfiguration)}.
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index d1f332e4468..c79381ad237 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.RestorePartitionStateTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManagerSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheObjectBinaryProcessorOnDiscoveryTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDiscoDataHandlingInNewClusterTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNotificationsTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreePageMemoryImplTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreeReuseListPageMemoryImplTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.FillFactorMetricTest;
@@ -121,6 +122,8 @@ public class IgnitePdsTestSuite5 {
         GridTestUtils.addTestIfNeeded(suite, FileWriteAheadLogManagerSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheDatabaseSharedManagerSelfTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, WalCompactionNotificationsTest.class, ignoredTests);
+
         return suite;
     }
 }