You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/04/30 10:15:28 UTC

[ignite] 12/17: GG-17480 Implement WAL page snapshot records compression - Fixes #6116.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-18539
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit b1a355a2b4df6082da4d93a11d9253eacd9af036
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Apr 29 13:21:49 2019 +0300

    GG-17480 Implement WAL page snapshot records compression - Fixes #6116.
    
    Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
    
    (cherry picked from commit 2ad6c85)
---
 modules/compress/pom.xml                           |  15 ++
 .../compress/CompressionProcessorImpl.java         | 100 +++++++---
 ...ionWithRealCpDisabledAndWalCompressionTest.java |  37 ++++
 .../wal/WalCompactionAndPageCompressionTest.java   |  35 ++++
 .../WalRecoveryWithPageCompressionAndTdeTest.java  |  51 ++++++
 .../db/wal/WalRecoveryWithPageCompressionTest.java |  36 ++++
 .../AbstractPageCompressionIntegrationTest.java    | 201 +++++++++++++++++++++
 .../DiskPageCompressionIntegrationTest.java        | 171 +-----------------
 .../WalPageCompressionIntegrationTest.java         | 101 +++++++++++
 .../testsuites/IgnitePdsCompressionTestSuite.java  |  12 ++
 .../ignite/configuration/CacheConfiguration.java   |   5 +-
 .../configuration/DataStorageConfiguration.java    |  53 ++++++
 .../ignite/configuration/DiskPageCompression.java  |   5 +
 .../internal/pagemem/wal/record/PageSnapshot.java  |  76 ++++++--
 .../processors/cache/CacheCompressionManager.java  |  10 +-
 .../GridCacheDatabaseSharedManager.java            |   9 +
 .../cache/persistence/file/FilePageStore.java      |   4 +
 .../cache/persistence/pagemem/PageMemoryImpl.java  |  11 +-
 .../cache/persistence/tree/io/PageIO.java          |  38 +++-
 .../persistence/wal/FileWriteAheadLogManager.java  |  43 +++++
 .../wal/serializer/RecordDataSerializer.java       |   3 +-
 .../wal/serializer/RecordDataV1Serializer.java     |  17 +-
 .../wal/serializer/RecordDataV2Serializer.java     |  17 +-
 .../wal/serializer/RecordV1Serializer.java         |   2 +-
 .../wal/serializer/RecordV2Serializer.java         |   3 +-
 .../processors/compress/CompressionProcessor.java  |  11 +-
 .../platform/utils/PlatformConfigurationUtils.java |  15 ++
 .../processors/cache/persistence/DummyPageIO.java  |   4 +
 ...CheckpointSimulationWithRealCpDisabledTest.java |  25 ++-
 .../persistence/db/wal/WalCompactionTest.java      |  26 ++-
 .../wal/memtracker/PageMemoryTracker.java          |   3 +-
 .../persistence/db/wal/IgniteWalRecoveryTest.java  |  71 ++++++--
 .../Config/full-config.xml                         |   3 +-
 .../IgniteConfigurationSerializerTest.cs           |   3 +
 .../Apache.Ignite.Core/Apache.Ignite.Core.csproj   |   1 +
 .../Configuration/DataStorageConfiguration.cs      |  22 ++-
 .../Configuration/DiskPageCompression.cs           |  50 +++++
 .../IgniteConfigurationSection.xsd                 |  20 ++
 38 files changed, 1049 insertions(+), 260 deletions(-)

diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml
index a7f77de..bdcd924 100644
--- a/modules/compress/pom.xml
+++ b/modules/compress/pom.xml
@@ -42,6 +42,21 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-indexing</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-indexing</artifactId>
+            <scope>test</scope>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.github.jnr</groupId>
             <artifactId>jnr-posix</artifactId>
             <version>${jnr.posix.version}</version>
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
index a89f67a..6d1b95f 100644
--- a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -27,8 +27,10 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.xerial.snappy.Snappy;
 
@@ -63,6 +65,11 @@ public class CompressionProcessorImpl extends CompressionProcessor {
     }
 
     /** {@inheritDoc} */
+    @Override public void checkPageCompressionSupported() throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException {
         if (!U.isLinux())
             throw new IgniteCheckedException("Currently page compression is supported only for Linux.");
@@ -92,46 +99,74 @@ public class CompressionProcessorImpl extends CompressionProcessor {
         DiskPageCompression compression,
         int compressLevel
     ) throws IgniteCheckedException {
-        assert compression != null;
-        assert U.isPow2(pageSize): pageSize;
+        assert compression != null && compression != DiskPageCompression.DISABLED : compression;
         assert U.isPow2(blockSize): blockSize;
-        assert page.position() == 0 && page.limit() == pageSize;
+        assert page.position() == 0 && page.limit() >= pageSize;
 
-        PageIO io = PageIO.getPageIO(page);
+        int oldPageLimit = page.limit();
 
-        if (!(io instanceof CompactablePageIO))
-            return page;
+        try {
+            // Page size will be less than page limit when TDE is enabled. To make compaction and compression work
+            // correctly we need to set limit to real page size.
+            page.limit(pageSize);
 
-        ByteBuffer compactPage = compactBuf.get();
+            ByteBuffer compactPage = doCompactPage(page, pageSize);
 
-        // Drop the garbage from the page.
-        ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
-        page.clear();
+            int compactSize = compactPage.limit();
 
-        int compactSize = compactPage.limit();
+            assert compactSize <= pageSize : compactSize;
 
-        assert compactSize <= pageSize: compactSize;
+            // If no need to compress further or configured just to skip garbage.
+            if (compactSize < blockSize || compression == SKIP_GARBAGE)
+                return setCompactionInfo(compactPage, compactSize);
+
+            ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
+
+            assert compressedPage.position() == 0;
+            int compressedSize = compressedPage.limit();
+
+            int freeCompactBlocks = (pageSize - compactSize) / blockSize;
+            int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+
+            if (freeCompactBlocks >= freeCompressedBlocks) {
+                if (freeCompactBlocks == 0)
+                    return page; // No blocks will be released.
+
+                return setCompactionInfo(compactPage, compactSize);
+            }
 
-        // If no need to compress further or configured just to skip garbage.
-        if (compactSize < blockSize || compression == SKIP_GARBAGE)
-            return setCompactionInfo(compactPage, compactSize);
+            return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
+        }
+        finally {
+            page.limit(oldPageLimit);
+        }
+    }
 
-        ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
+    /**
+     * @param page Page buffer.
+     * @param pageSize Page size.
+     * @return Compacted page buffer.
+     */
+    private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
+        PageIO io = PageIO.getPageIO(page);
 
-        assert compressedPage.position() == 0;
-        int compressedSize = compressedPage.limit();
+        ByteBuffer compactPage = compactBuf.get();
 
-        int freeCompactBlocks = (pageSize - compactSize) / blockSize;
-        int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+        if (io instanceof CompactablePageIO) {
+            // Drop the garbage from the page.
+            ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
+        }
+        else {
+            // Direct buffer is required as output of this method.
+            if (page.isDirect())
+                return page;
 
-        if (freeCompactBlocks >= freeCompressedBlocks) {
-            if (freeCompactBlocks == 0)
-                return page; // No blocks will be released.
+            PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array());
 
-            return setCompactionInfo(compactPage, compactSize);
+            compactPage.limit(pageSize);
         }
 
-        return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
+        return compactPage;
     }
 
     /**
@@ -260,7 +295,7 @@ public class CompressionProcessorImpl extends CompressionProcessor {
      * @return Level.
      */
     private static byte getCompressionType(DiskPageCompression compression) {
-        if (compression == null)
+        if (compression == DiskPageCompression.DISABLED)
             return UNCOMPRESSED_PAGE;
 
         switch (compression) {
@@ -281,7 +316,7 @@ public class CompressionProcessorImpl extends CompressionProcessor {
 
     /** {@inheritDoc} */
     @Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
-        assert page.capacity() == pageSize;
+        assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize;
 
         byte compressType = PageIO.getCompressionType(page);
 
@@ -338,11 +373,16 @@ public class CompressionProcessorImpl extends CompressionProcessor {
             assert page.limit() == compactSize;
         }
 
-        CompactablePageIO io = PageIO.getPageIO(page);
+        PageIO io = PageIO.getPageIO(page);
 
-        io.restorePage(page, pageSize);
+        if (io instanceof CompactablePageIO)
+            ((CompactablePageIO)io).restorePage(page, pageSize);
+        else {
+            assert compactSize == pageSize
+                : "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']';
+        }
 
-        setCompressionInfo(page, null, 0, 0);
+        setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
     }
 
     /** */
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java
new file mode 100644
index 0000000..48416b4
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
+
+/**
+ *
+ */
+public class IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest
+    extends IgnitePdsCheckpointSimulationWithRealCpDisabledTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.getDataStorageConfiguration().setWalPageCompression(DiskPageCompression.SNAPPY);
+
+        return cfg;
+    }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java
new file mode 100644
index 0000000..01e2040
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class WalCompactionAndPageCompressionTest extends WalCompactionTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.getDataStorageConfiguration().setWalPageCompression(DiskPageCompression.SNAPPY);
+
+        return cfg;
+    }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java
new file mode 100644
index 0000000..07c1312
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
+import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi;
+
+/**
+ *
+ */
+public class WalRecoveryWithPageCompressionAndTdeTest extends WalRecoveryWithPageCompressionTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        for (CacheConfiguration<?, ?> ccfg : cfg.getCacheConfiguration())
+            ccfg.setEncryptionEnabled(true);
+
+        KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi();
+
+        encSpi.setKeyStorePath(AbstractEncryptionTest.KEYSTORE_PATH);
+        encSpi.setKeyStorePassword(AbstractEncryptionTest.KEYSTORE_PASSWORD.toCharArray());
+
+        cfg.setEncryptionSpi(encSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testWalRenameDirSimple() throws Exception {
+        // Ignore this test when TDE is enabled, since there is internal cache group id change without corresponding
+        // encryption keys change.
+    }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java
new file mode 100644
index 0000000..aaf4b25
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION;
+
+/**
+ * WAL recovery test with WAL page compression enabled and PDS disk page compression disabled.
+ */
+@WithSystemProperty(key = IGNITE_DEFAULT_DISK_PAGE_COMPRESSION, value = "DISABLED")
+public class WalRecoveryWithPageCompressionTest extends IgniteWalRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        walPageCompression = DiskPageCompression.ZSTD;
+    }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java
new file mode 100644
index 0000000..31d838d
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL;
+
+/**
+ *
+ */
+public abstract class AbstractPageCompressionIntegrationTest extends GridCommonAbstractTest {
+    /** */
+    protected DiskPageCompression compression;
+
+    /** */
+    protected Integer compressionLevel;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        compression = DiskPageCompression.DISABLED;
+        compressionLevel = null;
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Zstd_Max() throws Exception {
+        compression = ZSTD;
+        compressionLevel = ZSTD_MAX_LEVEL;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Zstd_Default() throws Exception {
+        compression = ZSTD;
+        compressionLevel = null;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Zstd_Min() throws Exception {
+        compression = ZSTD;
+        compressionLevel = ZSTD_MIN_LEVEL;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Lz4_Max() throws Exception {
+        compression = LZ4;
+        compressionLevel = LZ4_MAX_LEVEL;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Lz4_Default() throws Exception {
+        compression = LZ4;
+        compressionLevel = null;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Lz4_Min() throws Exception {
+        assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_SkipGarbage() throws Exception {
+        compression = SKIP_GARBAGE;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPageCompression_Snappy() throws Exception {
+        compression = SNAPPY;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    protected abstract void doTestPageCompression() throws Exception;
+
+    /**
+     *
+     */
+    static class TestVal implements Serializable {
+        /** */
+        static final long serialVersionUID = 1L;
+
+        /** */
+        @QuerySqlField
+        String str;
+
+        /** */
+        int i;
+
+        /** */
+        @QuerySqlField
+        long x;
+
+        /** */
+        @QuerySqlField
+        UUID id;
+
+        TestVal(int i) {
+            this.str =  i + "bla bla bla!";
+            this.i = -i;
+            this.x = 0xffaabbccdd773311L + i;
+            this.id = new UUID(i,-i);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TestVal testVal = (TestVal)o;
+
+            if (i != testVal.i) return false;
+            if (x != testVal.x) return false;
+            if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false;
+            return id != null ? id.equals(testVal.id) : testVal.id == null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = str != null ? str.hashCode() : 0;
+            result = 31 * result + i;
+            result = 31 * result + (int)(x ^ (x >>> 32));
+            result = 31 * result + (id != null ? id.hashCode() : 0);
+            return result;
+        }
+    }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
index 0158828..865ab22 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
@@ -18,21 +18,17 @@ package org.apache.ignite.internal.processors.compress;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.file.OpenOption;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.pagemem.store.PageStore;
@@ -46,47 +42,19 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
-import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
-import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
-import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
 import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL;
 
 /**
  *
  */
-public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
-    /** */
-    private DiskPageCompression compression;
-
-    /** */
-    private Integer compressionLevel;
-
+public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionIntegrationTest {
     /** */
     private FileIOFactory factory;
 
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        compression = null;
-        compressionLevel = null;
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids(true);
-    }
-
-    /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
         DataRegionConfiguration drCfg = new DataRegionConfiguration()
             .setPersistenceEnabled(true);
@@ -112,90 +80,8 @@ public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Test
-    public void testPageCompression_Zstd_Max() throws Exception {
-        compression = ZSTD;
-        compressionLevel = ZSTD_MAX_LEVEL;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_Zstd_Default() throws Exception {
-        compression = ZSTD;
-        compressionLevel = null;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_Zstd_Min() throws Exception {
-        compression = ZSTD;
-        compressionLevel = ZSTD_MIN_LEVEL;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_Lz4_Max() throws Exception {
-        compression = LZ4;
-        compressionLevel = LZ4_MAX_LEVEL;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_Lz4_Default() throws Exception {
-        compression = LZ4;
-        compressionLevel = null;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_Lz4_Min() throws Exception {
-        assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_SkipGarbage() throws Exception {
-        compression = SKIP_GARBAGE;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPageCompression_Snappy() throws Exception {
-        compression = SNAPPY;
-
-        doTestPageCompression();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void doTestPageCompression() throws Exception {
+    @Override
+    protected void doTestPageCompression() throws Exception {
         IgniteEx ignite = startGrid(0);
 
         ignite.cluster().active(true);
@@ -342,57 +228,6 @@ public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
 
     /**
      */
-    static class TestVal implements Serializable {
-        /** */
-        static final long serialVersionUID = 1L;
-
-        /** */
-        @QuerySqlField
-        String str;
-
-        /** */
-        int i;
-
-        /** */
-        @QuerySqlField
-        long x;
-
-        /** */
-        @QuerySqlField
-        UUID id;
-
-        TestVal(int i) {
-            this.str =  i + "bla bla bla!";
-            this.i = -i;
-            this.x = 0xffaabbccdd773311L + i;
-            this.id = new UUID(i,-i);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            TestVal testVal = (TestVal)o;
-
-            if (i != testVal.i) return false;
-            if (x != testVal.x) return false;
-            if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false;
-            return id != null ? id.equals(testVal.id) : testVal.id == null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int result = str != null ? str.hashCode() : 0;
-            result = 31 * result + i;
-            result = 31 * result + (int)(x ^ (x >>> 32));
-            result = 31 * result + (id != null ? id.hashCode() : 0);
-            return result;
-        }
-    }
-
-    /**
-     */
     static class PunchFileIO extends FileIODecorator {
         /** */
         private ConcurrentMap<Long, Integer> holes = new ConcurrentHashMap<>();
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java
new file mode 100644
index 0000000..c16f9e4
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class WalPageCompressionIntegrationTest extends AbstractPageCompressionIntegrationTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+            .setWalPageCompression(compression)
+            .setWalPageCompressionLevel(compressionLevel);
+
+        return super.getConfiguration(igniteName)
+            .setDataStorageConfiguration(dsCfg)
+            // Set new IP finder for each node to start independent clusters.
+            .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder(true)));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override
+    protected void doTestPageCompression() throws Exception {
+        // Ignite instance with compressed WAL page records.
+        IgniteEx ignite0 = startGrid(0);
+
+        compression = DiskPageCompression.DISABLED;
+        compressionLevel = null;
+
+        // Reference ignite instance with uncompressed WAL page records.
+        IgniteEx ignite1 = startGrid(1);
+
+        ignite0.cluster().active(true);
+        ignite1.cluster().active(true);
+
+        String cacheName = "test";
+
+        CacheConfiguration<Integer, TestVal> ccfg = new CacheConfiguration<Integer, TestVal>()
+            .setName(cacheName)
+            .setBackups(0)
+            .setAtomicityMode(ATOMIC)
+            .setIndexedTypes(Integer.class, TestVal.class);
+
+        IgniteCache<Integer,TestVal> cache0 = ignite0.getOrCreateCache(ccfg);
+        IgniteCache<Integer,TestVal> cache1 = ignite1.getOrCreateCache(ccfg);
+
+        int cnt = 20_000;
+
+        for (int i = 0; i < cnt; i++) {
+            assertTrue(cache0.putIfAbsent(i, new TestVal(i)));
+            assertTrue(cache1.putIfAbsent(i, new TestVal(i)));
+        }
+
+        for (int i = 0; i < cnt; i += 2) {
+            assertEquals(new TestVal(i), cache0.getAndRemove(i));
+            assertEquals(new TestVal(i), cache1.getAndRemove(i));
+        }
+
+        // Write any WAL record to get current WAL pointers.
+        FileWALPointer ptr0 = (FileWALPointer)ignite0.context().cache().context().wal().log(new CheckpointRecord(null));
+        FileWALPointer ptr1 = (FileWALPointer)ignite1.context().cache().context().wal().log(new CheckpointRecord(null));
+
+        log.info("Compressed WAL pointer: " + ptr0);
+        log.info("Uncompressed WAL pointer: " + ptr1);
+
+        assertTrue("Compressed WAL must be smaller than uncompressed [ptr0=" + ptr0 + ", ptr1=" + ptr1 + ']',
+            ptr0.compareTo(ptr1) < 0);
+    }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
index b38abc0..efbdd14 100644
--- a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
+++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
@@ -18,11 +18,16 @@ package org.apache.ignite.testsuites;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionTest;
 import org.apache.ignite.internal.processors.compress.CompressionConfigurationTest;
 import org.apache.ignite.internal.processors.compress.CompressionProcessorTest;
 import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationAsyncTest;
 import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest;
 import org.apache.ignite.internal.processors.compress.FileSystemUtilsTest;
+import org.apache.ignite.internal.processors.compress.WalPageCompressionIntegrationTest;
 import org.apache.ignite.testframework.junits.DynamicSuite;
 import org.junit.runner.RunWith;
 
@@ -45,6 +50,13 @@ public class IgnitePdsCompressionTestSuite {
         suite.add(DiskPageCompressionIntegrationTest.class);
         suite.add(DiskPageCompressionIntegrationAsyncTest.class);
 
+        // WAL page records compression.
+        suite.add(WalPageCompressionIntegrationTest.class);
+        suite.add(WalRecoveryWithPageCompressionTest.class);
+        suite.add(WalRecoveryWithPageCompressionAndTdeTest.class);
+        suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class);
+        suite.add(WalCompactionAndPageCompressionTest.class);
+
         enableCompressionByDefault();
         IgnitePdsTestSuite.addRealPageStoreTests(suite, null);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 800b41c..1df064e 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -190,6 +190,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default SQL on-heap cache size. */
     public static final int DFLT_SQL_ONHEAP_CACHE_MAX_SIZE = 0;
 
+    /** Default disk page compression algorithm. */
+    public static final DiskPageCompression DFLT_DISK_PAGE_COMPRESSION = DiskPageCompression.DISABLED;
+
     /** Cache name. */
     private String name;
 
@@ -2318,7 +2321,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
      * @see #getDiskPageCompressionLevel
      */
     public DiskPageCompression getDiskPageCompression() {
-        return diskPageCompression;
+        return diskPageCompression == null ? DFLT_DISK_PAGE_COMPRESSION : diskPageCompression;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index c2cb8bc..d10bda1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -166,6 +166,9 @@ public class DataStorageConfiguration implements Serializable {
     /** Default wal compaction level. */
     public static final int DFLT_WAL_COMPACTION_LEVEL = Deflater.BEST_SPEED;
 
+    /** Default compression algorithm for WAL page snapshot records. */
+    public static final DiskPageCompression DFLT_WAL_PAGE_COMPRESSION = DiskPageCompression.DISABLED;
+
     /** Initial size of a memory chunk reserved for system cache. */
     private long sysRegionInitSize = DFLT_SYS_REG_INIT_SIZE;
 
@@ -289,6 +292,12 @@ public class DataStorageConfiguration implements Serializable {
     /** Timeout for checkpoint read lock acquisition. */
     private Long checkpointReadLockTimeout;
 
+    /** Compression algorithm for WAL page snapshot records. */
+    private DiskPageCompression walPageCompression = DFLT_WAL_PAGE_COMPRESSION;
+
+    /** Compression level for WAL page snapshot records. */
+    private Integer walPageCompressionLevel;
+
     /**
      * Initial size of a data region reserved for system cache.
      *
@@ -1020,6 +1029,50 @@ public class DataStorageConfiguration implements Serializable {
         return this;
     }
 
+    /**
+     * Gets compression algorithm for WAL page snapshot records.
+     *
+     * @return Page compression algorithm.
+     */
+    public DiskPageCompression getWalPageCompression() {
+        return walPageCompression == null ? DFLT_WAL_PAGE_COMPRESSION : walPageCompression;
+    }
+
+    /**
+     * Sets compression algorithm for WAL page snapshot records.
+     *
+     * @param walPageCompression Page compression algorithm.
+     * @return {@code this} for chaining.
+     */
+    public DataStorageConfiguration setWalPageCompression(DiskPageCompression walPageCompression) {
+        this.walPageCompression = walPageCompression;
+
+        return this;
+    }
+
+    /**
+     * Gets {@link #getWalPageCompression algorithm} specific WAL page compression level.
+     *
+     * @return WAL page snapshots compression level or {@code null} for default.
+     */
+    public Integer getWalPageCompressionLevel() {
+        return walPageCompressionLevel;
+    }
+
+    /**
+     * Sets {@link #setWalPageCompression algorithm} specific page compression level.
+     *
+     * @param walPageCompressionLevel Disk page compression level or {@code null} to use default.
+     *      {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}).
+     *      {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}).
+     * @return {@code this} for chaining.
+     */
+    public DataStorageConfiguration setWalPageCompressionLevel(Integer walPageCompressionLevel) {
+        this.walPageCompressionLevel = walPageCompressionLevel;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataStorageConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
index 5deda2f..29186f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
@@ -23,8 +23,13 @@ import org.jetbrains.annotations.Nullable;
  *
  * @see CacheConfiguration#setDiskPageCompression
  * @see CacheConfiguration#setDiskPageCompressionLevel
+ * @see DataStorageConfiguration#setWalPageCompression
+ * @see DataStorageConfiguration#setWalPageCompressionLevel
  */
 public enum DiskPageCompression {
+    /** Compression disabled. */
+    DISABLED,
+
     /** Retain only useful data from half-filled pages, but do not apply any compression. */
     SKIP_GARBAGE,
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
index 6ae2c92..feae9d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
@@ -28,16 +28,16 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 /**
  *
  */
-public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
+public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware {
     /** */
     @GridToStringExclude
-    private byte[] pageData;
+    private ByteBuffer pageData;
 
     /** */
     private FullPageId fullPageId;
 
     /**
-     * PageSIze without encryption overhead.
+     * PageSize without encryption overhead.
      */
     private int realPageSize;
 
@@ -48,13 +48,16 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
      */
     public PageSnapshot(FullPageId fullId, byte[] arr, int realPageSize) {
         this.fullPageId = fullId;
-        this.pageData = arr;
+        this.pageData = ByteBuffer.wrap(arr).order(ByteOrder.nativeOrder());
         this.realPageSize = realPageSize;
     }
 
     /**
+     * This constructor doesn't actually create a page snapshot (copy), it creates a wrapper over page memory region.
+     * A created record should not be used after WAL manager writes it to log, since page content can be modified.
+     *
      * @param fullPageId Full page ID.
-     * @param ptr Pointer to copy from.
+     * @param ptr Pointer to wrap.
      * @param pageSize Page size.
      * @param realPageSize Page size without encryption overhead.
      */
@@ -62,9 +65,7 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
         this.fullPageId = fullPageId;
         this.realPageSize = realPageSize;
 
-        pageData = new byte[pageSize];
-
-        GridUnsafe.copyMemory(null, ptr, pageData, GridUnsafe.BYTE_ARR_OFF, pageSize);
+        pageData = GridUnsafe.wrapPointer(ptr, pageSize);
     }
 
     /** {@inheritDoc} */
@@ -76,6 +77,31 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
      * @return Snapshot of page data.
      */
     public byte[] pageData() {
+        if (!pageData.isDirect())
+            return pageData.array();
+
+        // In case of direct buffer copy buffer content to new array.
+        byte[] arr = new byte[pageData.limit()];
+
+        GridUnsafe.copyMemory(null, GridUnsafe.bufferAddress(pageData), arr, GridUnsafe.BYTE_ARR_OFF,
+            pageData.limit());
+
+        return arr;
+    }
+
+    /**
+     * @return Size of page data.
+     */
+    public int pageDataSize() {
+        return pageData.limit();
+    }
+
+    /**
+     * @return Page data byte buffer.
+     */
+    public ByteBuffer pageDataBuffer() {
+        pageData.rewind();
+
         return pageData;
     }
 
@@ -87,10 +113,26 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
     }
 
     /** {@inheritDoc} */
+    @Override public int groupId() {
+        return fullPageId.groupId();
+    }
+
+    /**
+     * @return PageSize without encryption overhead.
+     */
+    public int realPageSize() {
+        return realPageSize;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
-        ByteBuffer buf = ByteBuffer.allocateDirect(pageData.length);
-        buf.order(ByteOrder.nativeOrder());
-        buf.put(pageData);
+        ByteBuffer buf = pageData;
+
+        if (!pageData.isDirect()) {
+            buf = ByteBuffer.allocateDirect(pageDataSize());
+            buf.order(ByteOrder.nativeOrder());
+            buf.put(pageData);
+        }
 
         long addr = GridUnsafe.bufferAddress(buf);
 
@@ -101,16 +143,12 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
                 + super.toString() + "]]";
         }
         catch (IgniteCheckedException ignored) {
-            return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() +
-                ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]";
+            return "Error during call 'toString' of PageSnapshot [fullPageId=" + fullPageId() +
+                ", pageData = " + Arrays.toString(pageData()) + ", super=" + super.toString() + "]";
         }
         finally {
-            GridUnsafe.cleanDirectBuffer(buf);
+            if (!pageData.isDirect())
+                GridUnsafe.cleanDirectBuffer(buf);
         }
     }
-
-    /** {@inheritDoc} */
-    @Override public int groupId() {
-        return fullPageId.groupId();
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
index a050cd0..cac0064 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
@@ -44,13 +44,19 @@ public class CacheCompressionManager extends GridCacheManagerAdapter {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
+        if (cctx.kernalContext().clientNode()) {
+            diskPageCompression = DiskPageCompression.DISABLED;
+
+            return;
+        }
+
         compressProc = cctx.kernalContext().compress();
 
         CacheConfiguration cfg = cctx.config();
 
         diskPageCompression = cctx.kernalContext().config().isClientMode() ? null : cfg.getDiskPageCompression();
 
-        if (diskPageCompression != null) {
+        if (diskPageCompression != DiskPageCompression.DISABLED) {
             if (!cctx.dataRegion().config().isPersistenceEnabled())
                 throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence.");
 
@@ -81,7 +87,7 @@ public class CacheCompressionManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException If failed.
      */
     public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException {
-        if (diskPageCompression == null)
+        if (diskPageCompression == DiskPageCompression.DISABLED)
             return page;
 
         int blockSize = store.getBlockSize();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d73b3da..a10877d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -151,6 +151,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
@@ -2529,6 +2530,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             try {
                 PageUtils.putBytes(pageAddr, 0, pageSnapshotRecord.pageData());
+
+                if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) {
+                    int realPageSize = pageMem.realPageSize(pageSnapshotRecord.groupId());
+
+                    assert pageSnapshotRecord.pageDataSize() < realPageSize : pageSnapshotRecord.pageDataSize();
+
+                    cctx.kernalContext().compress().decompressPage(pageMem.pageBuffer(pageAddr), realPageSize);
+                }
             }
             finally {
                 pageMem.writeUnlock(grpId, pageId, page, null, true, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index ed7dd2f..23adeac 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -346,6 +346,10 @@ public class FilePageStore implements PageStore {
             if (inited) {
                 long newSize = Math.max(pageSize, fileIO.size() - headerSize());
 
+                // In the case of compressed pages we can miss the tail of the page.
+                if (newSize % pageSize != 0)
+                    newSize += pageSize - newSize % pageSize;
+
                 long delta = newSize - allocated.getAndSet(newSize);
 
                 assert delta % pageSize == 0 : delta;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index ed2d344..da787fb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.stat.IoStatisticsHolder;
 import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
@@ -967,7 +968,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                             if (snapshot.fullPageId().equals(fullId)) {
                                 if (tmpAddr == null) {
-                                    assert snapshot.pageData().length <= pageSize() : snapshot.pageData().length;
+                                    assert snapshot.pageDataSize() <= pageSize() : snapshot.pageDataSize();
 
                                     tmpAddr = GridUnsafe.allocateMemory(pageSize());
                                 }
@@ -976,6 +977,14 @@ public class PageMemoryImpl implements PageMemoryEx {
                                     curPage = wrapPointer(tmpAddr, pageSize());
 
                                 PageUtils.putBytes(tmpAddr, 0, snapshot.pageData());
+
+                                if (PageIO.getCompressionType(tmpAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) {
+                                    int realPageSize = realPageSize(snapshot.groupId());
+
+                                    assert snapshot.pageDataSize() < realPageSize : snapshot.pageDataSize();
+
+                                    ctx.kernalContext().compress().decompressPage(curPage, realPageSize);
+                                }
                             }
 
                             break;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 0e66b77..ab660d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -405,6 +405,14 @@ public abstract class PageIO {
     }
 
     /**
+     * @param pageAddr Page address.
+     * @return Compression type.
+     */
+    public static byte getCompressionType(long pageAddr) {
+        return PageUtils.getByte(pageAddr, COMPRESSION_TYPE_OFF);
+    }
+
+    /**
      * @param page Page buffer.
      * @param compressedSize Compressed size.
      */
@@ -421,6 +429,14 @@ public abstract class PageIO {
     }
 
     /**
+     * @param pageAddr Page address.
+     * @return Compressed size.
+     */
+    public static short getCompressedSize(long pageAddr) {
+        return PageUtils.getShort(pageAddr, COMPRESSED_SIZE_OFF);
+    }
+
+    /**
      * @param page Page buffer.
      * @param compactedSize Compacted size.
      */
@@ -438,6 +454,14 @@ public abstract class PageIO {
 
     /**
      * @param pageAddr Page address.
+     * @return Compacted size.
+     */
+    public static short getCompactedSize(long pageAddr) {
+        return PageUtils.getShort(pageAddr, COMPACTED_SIZE_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
      * @return Checksum.
      */
     public static int getCrc(long pageAddr) {
@@ -839,9 +863,8 @@ public abstract class PageIO {
         assert pageSize <= out.remaining();
         assert pageSize == page.remaining();
 
-        page.mark();
-        out.put(page).flip();
-        page.reset();
+        PageHandler.copyMemory(page, 0, out, 0, pageSize);
+        out.limit(pageSize);
     }
 
     /**
@@ -857,7 +880,14 @@ public abstract class PageIO {
             .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
             .a("\n],\n");
 
-        io.printPage(addr, pageSize, sb);
+        if (getCompressionType(addr) != 0) {
+            sb.a("CompressedPage[\n\tcompressionType=").a(getCompressionType(addr))
+                .a(",\n\tcompressedSize=").a(getCompressedSize(addr))
+                .a(",\n\tcompactedSize=").a(getCompactedSize(addr))
+                .a("\n]");
+        }
+        else
+            io.printPage(addr, pageSize, sb);
 
         return sb.toString();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 0e45c26..7bf2e10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
@@ -103,9 +104,11 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.io.GridFileUtils;
@@ -357,6 +360,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Switch segment record offset. */
     private final AtomicLongArray switchSegmentRecordOffset;
 
+    /** Page snapshot records compression algorithm. */
+    private DiskPageCompression pageCompression;
+
+    /** Page snapshot records compression level. */
+    private int pageCompressionLevel;
+
     /**
      * @param ctx Kernal context.
      */
@@ -471,6 +480,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 segmentRouter,
                 ioFactory
             );
+
+            pageCompression = dsCfg.getWalPageCompression();
+
+            if (pageCompression != DiskPageCompression.DISABLED) {
+                if (serializerVer < 2) {
+                    throw new IgniteCheckedException("WAL page snapshots compression not supported for serializerVer=" +
+                        serializerVer);
+                }
+
+                cctx.kernalContext().compress().checkPageCompressionSupported();
+
+                pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != null ?
+                    CompressionProcessor.checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), pageCompression) :
+                    CompressionProcessor.getDefaultCompressionLevel(pageCompression);
+            }
         }
     }
 
@@ -769,6 +793,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (currWrHandle == null || (isDisable != null && isDisable.check()))
             return null;
 
+        // Do page snapshots compression if configured.
+        if (pageCompression != DiskPageCompression.DISABLED && rec instanceof PageSnapshot) {
+            PageSnapshot pageSnapshot = (PageSnapshot)rec;
+
+            int pageSize = pageSnapshot.realPageSize();
+
+            ByteBuffer pageData = pageSnapshot.pageDataBuffer();
+
+            ByteBuffer compressedPage = cctx.kernalContext().compress().compressPage(pageData, pageSize, 1,
+                pageCompression, pageCompressionLevel);
+
+            if (compressedPage != pageData) {
+                assert compressedPage.isDirect() : "Is direct buffer: " + compressedPage.isDirect();
+
+                rec = new PageSnapshot(pageSnapshot.fullPageId(), GridUnsafe.bufferAddress(compressedPage),
+                    compressedPage.limit(), pageSize);
+            }
+        }
+
         // Need to calculate record size first.
         rec.size(serializer.size(rec));
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
index e94be9f..e1d7c9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
@@ -41,11 +41,12 @@ public interface RecordDataSerializer {
      *
      * @param type Record type.
      * @param in Buffer to read.
+     * @param size Record size (0 if unknown).
      * @return WAL record.
      * @throws IOException In case of I/O problems.
      * @throws IgniteCheckedException If it's unable to read record.
      */
-    WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
+    WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in, int size) throws IOException, IgniteCheckedException;
 
     /**
      * Writes record data to buffer {@code buf}.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index c5ede10..dd6393f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -127,10 +127,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
     protected final GridCacheSharedContext cctx;
 
     /** Size of page used for PageMemory regions. */
-    private final int pageSize;
+    protected final int pageSize;
 
     /** Size of page without encryption overhead. */
-    private final int realPageSize;
+    protected final int realPageSize;
 
     /** Cache object processor to reading {@link DataEntry DataEntries}. */
     protected final IgniteCacheObjectProcessor co;
@@ -184,7 +184,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
     }
 
     /** {@inheritDoc} */
-    @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in)
+    @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in, int size)
         throws IOException, IgniteCheckedException {
         if (type == ENCRYPTED_RECORD) {
             if (encSpi == null) {
@@ -201,10 +201,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
             if (clData.get1() == null)
                 return new EncryptedRecord(clData.get2(), clData.get3());
 
-            return readPlainRecord(clData.get3(), clData.get1(), true);
+            return readPlainRecord(clData.get3(), clData.get1(), true, clData.get1().buffer().capacity());
         }
 
-        return readPlainRecord(type, in, false);
+        return readPlainRecord(type, in, false, size);
     }
 
     /** {@inheritDoc} */
@@ -346,7 +346,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
                 PageSnapshot pageRec = (PageSnapshot)record;
 
-                return pageRec.pageData().length + 12;
+                return pageRec.pageDataSize() + 12;
 
             case CHECKPOINT_RECORD:
                 CheckpointRecord cpRec = (CheckpointRecord)record;
@@ -532,12 +532,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
      * @param type Record type.
      * @param in Input
      * @param encrypted Record was encrypted.
+     * @param recordSize Record size.
      * @return Deserialized record.
      * @throws IOException If failed.
      * @throws IgniteCheckedException If failed.
      */
     WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in,
-        boolean encrypted) throws IOException, IgniteCheckedException {
+        boolean encrypted, int recordSize) throws IOException, IgniteCheckedException {
         WALRecord res;
 
         switch (type) {
@@ -1160,7 +1161,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
                 buf.putInt(snap.fullPageId().groupId());
                 buf.putLong(snap.fullPageId().pageId());
-                buf.put(snap.pageData());
+                buf.put(snap.pageDataBuffer());
 
                 break;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 8533e73..52714c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ConsistentCutRecord;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.pagemem.wal.record.LazyMvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -121,9 +123,20 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
     @Override WALRecord readPlainRecord(
         RecordType type,
         ByteBufferBackedDataInput in,
-        boolean encrypted
+        boolean encrypted,
+        int recordSize
     ) throws IOException, IgniteCheckedException {
         switch (type) {
+            case PAGE_RECORD:
+                int cacheId = in.readInt();
+                long pageId = in.readLong();
+
+                byte[] arr = new byte[recordSize - 4 /* cacheId */ - 8 /* pageId */];
+
+                in.readFully(arr);
+
+                return new PageSnapshot(new FullPageId(pageId, cacheId), arr, encrypted ? realPageSize : pageSize);
+
             case CHECKPOINT_RECORD:
                 long msb = in.readLong();
                 long lsb = in.readLong();
@@ -200,7 +213,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
                 return new ConsistentCutRecord();
 
             default:
-                return super.readPlainRecord(type, in, encrypted);
+                return super.readPlainRecord(type, in, encrypted, recordSize);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index 288be5a..ec0ddc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -139,7 +139,7 @@ public class RecordV1Serializer implements RecordSerializer {
             if (recType == null)
                 throw new IOException("Unknown record type: " + recType);
 
-            final WALRecord rec = dataSerializer.readRecord(recType, in);
+            final WALRecord rec = dataSerializer.readRecord(recType, in, 0);
 
             rec.position(ptr);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 872e47d..5a6db67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -159,7 +159,8 @@ public class RecordV2Serializer implements RecordSerializer {
                 return new MarshalledRecord(recType, ptr, buf);
             }
             else {
-                WALRecord rec = dataSerializer.readRecord(recType, in);
+                WALRecord rec = dataSerializer.readRecord(recType, in, ptr.length() - REC_TYPE_SIZE -
+                    FILE_WAL_POINTER_SIZE - CRC_SIZE);
 
                 rec.position(ptr);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
index 4f2227e..6c14d6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
@@ -50,7 +50,7 @@ public class CompressionProcessor extends GridProcessorAdapter {
     public static final int ZSTD_DEFAULT_LEVEL = 3;
 
     /** */
-    protected static final byte UNCOMPRESSED_PAGE = 0;
+    public static final byte UNCOMPRESSED_PAGE = 0;
 
     /** */
     protected static final byte COMPACTED_PAGE = 1;
@@ -133,6 +133,15 @@ public class CompressionProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Checks weither page compression is supported.
+     *
+     * @throws IgniteCheckedException If compression is not supported.
+     */
+    public void checkPageCompressionSupported() throws IgniteCheckedException {
+        fail();
+    }
+
+    /**
      * @param storagePath Storage path.
      * @param pageSize Page size.
      * @throws IgniteCheckedException If compression is not supported.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 167571d..2f74ac9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -60,6 +60,7 @@ import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.MemoryPolicyConfiguration;
@@ -1922,6 +1923,11 @@ public class PlatformConfigurationUtils {
         if (in.readBoolean())
             res.setCheckpointReadLockTimeout(in.readLong());
 
+        res.setWalPageCompression(DiskPageCompression.fromOrdinal(in.readInt()));
+
+        if (in.readBoolean())
+            res.setWalPageCompressionLevel(in.readInt());
+
         int cnt = in.readInt();
 
         if (cnt > 0) {
@@ -2058,6 +2064,15 @@ public class PlatformConfigurationUtils {
             else
                 w.writeBoolean(false);
 
+            w.writeInt(cfg.getWalPageCompression().ordinal());
+
+            if (cfg.getWalPageCompressionLevel() != null) {
+                w.writeBoolean(true);
+                w.writeInt(cfg.getWalPageCompressionLevel());
+            }
+            else
+                w.writeBoolean(false);
+
             if (cfg.getDataRegionConfigurations() != null) {
                 w.writeInt(cfg.getDataRegionConfigurations().length);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
index f0f1d8a..05ba134 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.GridStringBuilder;
 
@@ -27,6 +28,9 @@ import org.apache.ignite.internal.util.GridStringBuilder;
  */
 public class DummyPageIO extends PageIO implements CompactablePageIO {
     /** */
+    public static final IOVersions<DummyPageIO> VERSIONS = new IOVersions<>(new DummyPageIO());
+
+    /** */
     public DummyPageIO() {
         super(2 * Short.MAX_VALUE, 1);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index c44cbd0..5921c51 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.pagemem.FullPageId;
@@ -211,7 +212,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
 
         mem = shared.database().dataRegion(null).pageMemory();
 
-        verifyReads(res.get1(), mem, res.get2(), shared.wal());
+        verifyReads(ig.context(), res.get1(), mem, res.get2(), shared.wal());
     }
 
     /**
@@ -679,7 +680,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
             long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page);
 
             try {
-                DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
+                DummyPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
 
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
@@ -700,6 +701,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
      * @param mem Memory.
      */
     private void verifyReads(
+        GridKernalContext ctx,
         Map<FullPageId, Integer> res,
         PageMemory mem,
         WALPointer start,
@@ -707,6 +709,8 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
     ) throws Exception {
         Map<FullPageId, byte[]> replay = new HashMap<>();
 
+        ByteBuffer buf = ByteBuffer.allocateDirect(mem.pageSize()).order(ByteOrder.nativeOrder());
+
         try (PartitionMetaStateRecordExcludeIterator it = new PartitionMetaStateRecordExcludeIterator(wal.replay(start))) {
             IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
 
@@ -729,7 +733,22 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
                 else if (rec instanceof PageSnapshot) {
                     PageSnapshot page = (PageSnapshot)rec;
 
-                    replay.put(page.fullPageId(), page.pageData());
+                    int realPageSize = mem.realPageSize(page.groupId());
+
+                    byte[] pageData = page.pageData();
+
+                    if (page.pageDataSize() < realPageSize) {
+                        buf.clear();
+                        buf.put(pageData).flip();
+
+                        ctx.compress().decompressPage(buf, realPageSize);
+
+                        pageData = new byte[buf.limit()];
+
+                        buf.get(pageData);
+                    }
+
+                    replay.put(page.fullPageId(), pageData);
                 }
             }
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 729f9ac..427e80c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
 import org.apache.ignite.Ignite;
@@ -37,9 +38,11 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -172,9 +175,11 @@ public class WalCompactionTest extends GridCommonAbstractTest {
             cache.put(i, val);
         }
 
+        byte[] dummyPage = dummyPage(pageSize);
+
         // Spam WAL to move all data records to compressible WAL zone.
         for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) {
-            ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize],
+            ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), dummyPage,
                 pageSize));
         }
 
@@ -479,9 +484,11 @@ public class WalCompactionTest extends GridCommonAbstractTest {
             cache.put(i, val);
         }
 
+        byte[] dummyPage = dummyPage(pageSize);
+
         // Spam WAL to move all data records to compressible WAL zone.
         for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) {
-            ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize],
+            ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), dummyPage,
                 pageSize));
         }
 
@@ -564,4 +571,19 @@ public class WalCompactionTest extends GridCommonAbstractTest {
 
         assertFalse(fail);
     }
+
+    /**
+     * @param pageSize Page size.
+     */
+    private static byte[] dummyPage(int pageSize) {
+        ByteBuffer pageBuf = ByteBuffer.allocateDirect(pageSize);
+
+        DummyPageIO.VERSIONS.latest().initNewPage(GridUnsafe.bufferAddress(pageBuf), -1, pageSize);
+
+        byte[] pageData = new byte[pageSize];
+
+        pageBuf.get(pageData);
+
+        return pageData;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
index 560d707..b013d24 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
@@ -424,7 +423,7 @@ public class PageMemoryTracker implements IgnitePlugin {
             page.lock();
 
             try {
-                PageUtils.putBytes(page.address(), 0, snapshot.pageData());
+                GridUnsafe.copyMemory(GridUnsafe.bufferAddress(snapshot.pageDataBuffer()), page.address(), pageSize);
 
                 page.changeHistory().clear();
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 18df698..e1de9fd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.DiscoverySpiTestListener;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -82,6 +84,8 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
@@ -162,7 +166,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
 
     /** */
     private long checkpointFrequency = DFLT_CHECKPOINT_FREQ;
-    ;
+
+    /** WAL page snapshots records compression method. */
+    protected DiskPageCompression walPageCompression;
 
     /** {@inheritDoc} */
     @Override protected boolean isMultiJvm() {
@@ -213,6 +219,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
 
         dbCfg.setWalSegments(walSegments);
 
+        dbCfg.setWalPageCompression(walPageCompression);
+
         dbCfg.setCheckpointFrequency(checkpointFrequency);
 
         cfg.setDataStorageConfiguration(dbCfg);
@@ -1357,6 +1365,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
 
         ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
 
+        buf.order(ByteOrder.nativeOrder());
+
         // Now check that deltas can be correctly applied.
         try (WALIterator it = sharedCtx.wal().replay(ptr)) {
             while (it.hasNext()) {
@@ -1367,7 +1377,28 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
                 if (rec instanceof PageSnapshot) {
                     PageSnapshot page = (PageSnapshot)rec;
 
-                    rolledPages.put(page.fullPageId(), page.pageData());
+                    CacheGroupContext gctx = sharedCtx.cache().cacheGroup(page.groupId());
+
+                    int realPageSize = gctx == null ? pageSize
+                        : gctx.dataRegion().pageMemory().realPageSize(page.groupId());
+
+                    byte[] pageData = page.pageData();
+
+                    if (pageData.length < realPageSize) {
+                        buf.clear();
+                        buf.put(pageData);
+                        buf.flip();
+
+                        sharedCtx.kernalContext().compress().decompressPage(buf, realPageSize);
+
+                        pageData = new byte[realPageSize];
+
+                        buf.position(0);
+
+                        buf.get(pageData);
+                    }
+
+                    rolledPages.put(page.fullPageId(), pageData);
                 }
                 else if (rec instanceof PageDeltaRecord) {
                     PageDeltaRecord delta = (PageDeltaRecord)rec;
@@ -1384,17 +1415,13 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
 
                     assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData);
 
-                    buf.order(ByteOrder.nativeOrder());
-
-                    buf.position(0);
+                    buf.clear();
                     buf.put(pageData);
-                    buf.position(0);
+                    buf.flip();
 
                     delta.applyDelta(sharedCtx.database().dataRegion(null).pageMemory(),
                         GridUnsafe.bufferAddress(buf));
 
-                    buf.position(0);
-
                     buf.get(pageData);
                 }
             }
@@ -1404,6 +1431,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
 
         PageMemoryEx pageMem = (PageMemoryEx)db.dataRegion(null).pageMemory();
 
+        ByteBuffer bufWal = ByteBuffer.allocateDirect(pageSize);
+
         for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) {
             FullPageId fullId = entry.getKey();
 
@@ -1419,12 +1448,30 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
                     try {
                         byte[] data = entry.getValue();
 
-                        for (int i = 0; i < data.length; i++) {
-                            if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize()))
-                                continue; // Skip tracking pages.
+                        if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize()))
+                            continue; // Skip tracking pages.
 
-                            assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]);
+                        // Compaction/restoring page can left some trash in unused space, so we need to compare
+                        // compacted pages in case of compaction is used.
+                        if (walPageCompression != null && PageIO.getPageIO(bufPtr) instanceof CompactablePageIO) {
+                            CompactablePageIO pageIO = PageIO.getPageIO(bufPtr);
+
+                            buf.clear();
+                            bufWal.clear();
+
+                            int realPageSize = data.length;
+
+                            pageIO.compactPage(GridUnsafe.wrapPointer(bufPtr, realPageSize), buf, realPageSize);
+                            pageIO.compactPage(ByteBuffer.wrap(data), bufWal, realPageSize);
+
+                            bufPtr = GridUnsafe.bufferAddress(buf);
+                            data = new byte[bufWal.limit()];
+                            bufWal.rewind();
+                            bufWal.get(data);
                         }
+
+                        for (int i = 0; i < data.length; i++)
+                            assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]);
                     }
                     finally {
                         pageMem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, false, true);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index 6c6d69e..bf6715f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -149,7 +149,8 @@
                               walThreadLocalBufferSize="11"
                               walArchivePath="abc" walFlushFrequency="00:00:12" walFsyncDelayNanos="13" walHistorySize="14"
                               walMode="Background" walRecordIteratorBufferSize="15" walSegments="16" walSegmentSize="17"
-                              walPath="wal-store" writeThrottlingEnabled="true" walAutoArchiveAfterInactivity="00:00:18">
+                              walPath="wal-store" writeThrottlingEnabled="true" walAutoArchiveAfterInactivity="00:00:18"
+                              walPageCompression="Zstd">
         <dataRegionConfigurations>
             <dataRegionConfiguration emptyPagesPoolSize="1" evictionThreshold="2" initialSize="3" metricsEnabled="true"
                                      maxSize="4" name="reg2" pageEvictionMode="RandomLru" metricsRateTimeInterval="00:00:01"
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index 0135e80..4961954 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -323,6 +323,7 @@ namespace Apache.Ignite.Core.Tests
             Assert.AreEqual("wal-store", ds.WalPath);
             Assert.AreEqual(TimeSpan.FromSeconds(18), ds.WalAutoArchiveAfterInactivity);
             Assert.IsTrue(ds.WriteThrottlingEnabled);
+            Assert.AreEqual(DiskPageCompression.Zstd, ds.WalPageCompression);
 
             var dr = ds.DataRegionConfigurations.Single();
             Assert.AreEqual(1, dr.EmptyPagesPoolSize);
@@ -986,6 +987,8 @@ namespace Apache.Ignite.Core.Tests
                     ConcurrencyLevel = 1,
                     PageSize = 5 * 1024,
                     WalAutoArchiveAfterInactivity = TimeSpan.FromSeconds(19),
+                    WalPageCompression = DiskPageCompression.Lz4,
+                    WalPageCompressionLevel = 10,
                     DefaultDataRegionConfiguration = new DataRegionConfiguration
                     {
                         Name = "reg1",
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index ff96dc4..b99f2a8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -63,6 +63,7 @@
     <Compile Include="Common\IgniteProductVersion.cs" />
     <Compile Include="Configuration\CheckpointWriteOrder.cs" />
     <Compile Include="Configuration\DataPageEvictionMode.cs" />
+    <Compile Include="Configuration\DiskPageCompression.cs" />
     <Compile Include="Configuration\DataRegionConfiguration.cs" />
     <Compile Include="Configuration\DataStorageConfiguration.cs" />
     <Compile Include="Cache\Configuration\MemoryPolicyConfiguration.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
index d5cfe85..2ca4108 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
@@ -169,6 +169,11 @@ namespace Apache.Ignite.Core.Configuration
         public const long DefaultMaxWalArchiveSize = 1024 * 1024 * 1024;
 
         /// <summary>
+        /// Default value for <see cref="WalPageCompression"/>.
+        /// </summary>
+        public const DiskPageCompression DefaultWalPageCompression = DiskPageCompression.Disabled;
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="DataStorageConfiguration"/> class.
         /// </summary>
         public DataStorageConfiguration()
@@ -196,6 +201,7 @@ namespace Apache.Ignite.Core.Configuration
             PageSize = DefaultPageSize;
             WalAutoArchiveAfterInactivity = DefaultWalAutoArchiveAfterInactivity;
             MaxWalArchiveSize = DefaultMaxWalArchiveSize;
+            WalPageCompression = DefaultWalPageCompression;
         }
 
         /// <summary>
@@ -236,6 +242,8 @@ namespace Apache.Ignite.Core.Configuration
             ConcurrencyLevel = reader.ReadInt();
             WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan();
             CheckpointReadLockTimeout = reader.ReadTimeSpanNullable();
+            WalPageCompression = (DiskPageCompression)reader.ReadInt();
+            WalPageCompressionLevel = reader.ReadIntNullable();
 
             var count = reader.ReadInt();
 
@@ -290,6 +298,8 @@ namespace Apache.Ignite.Core.Configuration
             writer.WriteInt(ConcurrencyLevel);
             writer.WriteTimeSpanAsLong(WalAutoArchiveAfterInactivity);
             writer.WriteTimeSpanAsLongNullable(CheckpointReadLockTimeout);
+            writer.WriteInt((int)WalPageCompression);
+            writer.WriteIntNullable(WalPageCompressionLevel);
 
             if (DataRegionConfigurations != null)
             {
@@ -498,6 +508,16 @@ namespace Apache.Ignite.Core.Configuration
         public TimeSpan? CheckpointReadLockTimeout { get; set; }
 
         /// <summary>
+        /// Gets or sets the compression algorithm for WAL page snapshot records.
+        /// </summary>
+        public DiskPageCompression WalPageCompression { get; set; }
+
+        /// <summary>
+        /// Gets or sets the compression level for WAL page snapshot records.
+        /// </summary>
+        public int? WalPageCompressionLevel { get; set; }
+
+        /// <summary>
         /// Gets or sets the data region configurations.
         /// </summary>
         [SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")]
@@ -508,4 +528,4 @@ namespace Apache.Ignite.Core.Configuration
         /// </summary>
         public DataRegionConfiguration DefaultDataRegionConfiguration { get; set; }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs
new file mode 100644
index 0000000..133ecd1
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Configuration
+{
+    /// <summary>
+    /// Disk page compression options.
+    /// </summary>
+    public enum DiskPageCompression
+    {
+        /// <summary>
+        /// Compression disabled.
+        /// </summary>
+        Disabled,
+
+        /// <summary>
+        /// Retain only useful data from half-filled pages, but do not apply any compression.
+        /// </summary>
+        SkipGarbage,
+
+        /// <summary>
+        /// Zstd compression.
+        /// </summary>
+        Zstd,
+
+        /// <summary>
+        /// Lz4 compression.
+        /// </summary>
+        Lz4,
+
+        /// <summary>
+        /// Snappy compression.
+        /// </summary>
+        Snappy
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index c23cf90..9179048 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -117,6 +117,16 @@
         </xs:restriction>
     </xs:simpleType>
 
+    <xs:simpleType name="diskPageCompression" final="restriction">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="Disabled" />
+            <xs:enumeration value="SkipGarbage" />
+            <xs:enumeration value="Zstd" />
+            <xs:enumeration value="Lz4" />
+            <xs:enumeration value="Snappy" />
+        </xs:restriction>
+    </xs:simpleType>
+
     <xs:element name="igniteConfiguration">
         <xs:annotation>
             <xs:documentation>Ignite configuration root.</xs:documentation>
@@ -1948,6 +1958,16 @@
                                 </xs:documentation>
                             </xs:annotation>
                         </xs:attribute>
+                        <xs:attribute name="walPageCompression" type="diskPageCompression">
+                            <xs:annotation>
+                                <xs:documentation>Compression algorithm for WAL page snapshot records.</xs:documentation>
+                            </xs:annotation>
+                        </xs:attribute>
+                        <xs:attribute name="walPageCompressionLevel" type="xs:int">
+                            <xs:annotation>
+                                <xs:documentation>Compression level for WAL page snapshot records.</xs:documentation>
+                            </xs:annotation>
+                        </xs:attribute>
                     </xs:complexType>
                 </xs:element>
                 <xs:element name="sslContextFactory" minOccurs="0">