You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/11/24 16:07:32 UTC

[2/2] ignite git commit: ignite-5938 WAL logs compaction and compression after checkpoint

ignite-5938 WAL logs compaction and compression after checkpoint

Signed-off-by: agura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f50b2354
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f50b2354
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f50b2354

Branch: refs/heads/master
Commit: f50b2354d6b87084f0c29376915821296f3fd480
Parents: 010d02b
Author: Ivan Rakov <iv...@gmail.com>
Authored: Fri Nov 24 18:59:16 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Fri Nov 24 18:59:52 2017 +0300

----------------------------------------------------------------------
 ...tingToWalV2SerializerWithCompactionTest.java | 250 +++++++
 .../IgniteCompatibilityBasicTestSuite.java      |   3 +
 .../apache/ignite/IgniteSystemProperties.java   |   5 +
 .../configuration/DataStorageConfiguration.java |  27 +
 .../pagemem/wal/IgniteWriteAheadLogManager.java |   8 +
 .../pagemem/wal/record/FilteredRecord.java      |  31 +
 .../pagemem/wal/record/MarshalledRecord.java    |  61 ++
 .../GridCacheDatabaseSharedManager.java         |   5 +
 .../wal/AbstractWalRecordsIterator.java         |  73 ++-
 .../cache/persistence/wal/FileInput.java        |   8 +-
 .../wal/FileWriteAheadLogManager.java           | 650 +++++++++++++++----
 .../persistence/wal/RecordDataSerializer.java   |  58 --
 .../cache/persistence/wal/RecordSerializer.java |  62 --
 .../SingleSegmentLogicalRecordsIterator.java    | 141 ++++
 .../reader/StandaloneWalRecordsIterator.java    |   5 +-
 .../persistence/wal/record/HeaderRecord.java    |   7 +-
 .../persistence/wal/record/RecordTypes.java     |  69 ++
 .../wal/serializer/RecordDataSerializer.java    |  59 ++
 .../wal/serializer/RecordDataV1Serializer.java  |  11 +-
 .../wal/serializer/RecordDataV2Serializer.java  |  19 +-
 .../wal/serializer/RecordSerializer.java        |  63 ++
 .../wal/serializer/RecordSerializerFactory.java |  71 ++
 .../serializer/RecordSerializerFactoryImpl.java | 133 ++++
 .../wal/serializer/RecordV1Serializer.java      |  71 +-
 .../wal/serializer/RecordV2Serializer.java      |  93 ++-
 .../utils/PlatformConfigurationUtils.java       |   2 +
 .../db/wal/IgniteWalFlushFailoverTest.java      |  16 +-
 .../wal/IgniteWalHistoryReservationsTest.java   |   2 +-
 .../db/wal/IgniteWalRecoveryTest.java           |   2 +-
 .../IgniteWalRecoveryWithCompactionTest.java    |  33 +
 .../db/wal/IgniteWalSerializerVersionTest.java  |   9 +-
 .../persistence/db/wal/WalCompactionTest.java   | 312 +++++++++
 .../persistence/pagemem/NoOpWALManager.java     |   5 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 .../IgnitePdsWithIndexingCoreTestSuite.java     |   2 +
 .../Configuration/DataStorageConfiguration.cs   |  16 +
 .../IgniteConfigurationSection.xsd              |   5 +
 37 files changed, 2102 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java
new file mode 100644
index 0000000..0ca3833
--- /dev/null
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility.persistence;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+
+/**
+ * Saves data using previous version of ignite and then load this data using actual version
+ */
+public class MigratingToWalV2SerializerWithCompactionTest extends IgnitePersistenceCompatibilityAbstractTest {
+    /** */
+    private static final String TEST_CACHE_NAME = DummyPersistenceCompatibilityTest.class.getSimpleName();
+
+    /** Entries count. */
+    private static final int ENTRIES = 300;
+
+    /** Wal segment size. */
+    private static final int WAL_SEGMENT_SIZE = 1024 * 1024;
+
+    /** Entry payload size. */
+    private static final int PAYLOAD_SIZE = 20000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setPersistenceEnabled(true))
+            .setWalSegmentSize(WAL_SEGMENT_SIZE)
+            .setWalCompactionEnabled(true)
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalHistorySize(200);
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Tests opportunity to read data from previous Ignite DB version.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCompactingOldWalFiles() throws Exception {
+        doTestStartupWithOldVersion("2.3.0");
+    }
+
+    /**
+     * Tests opportunity to read data from previous Ignite DB version.
+     *
+     * @param ver 3-digits version of ignite
+     * @throws Exception If failed.
+     */
+    private void doTestStartupWithOldVersion(String ver) throws Exception {
+        try {
+            startGrid(1, ver, new ConfigurationClosure(), new PostStartupClosure());
+
+            stopAllGrids();
+
+            IgniteEx ignite = startGrid(0);
+
+            ignite.active(true);
+
+            IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(TEST_CACHE_NAME);
+
+            for (int i = ENTRIES; i < ENTRIES * 2; i++) {
+                final byte[] val = new byte[PAYLOAD_SIZE];
+
+                ThreadLocalRandom.current().nextBytes(val);
+
+                val[i] = 1;
+
+                cache.put(i, val);
+            }
+
+            // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
+            ignite.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+            ignite.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+            Thread.sleep(15_000); // Time to compress WAL.
+
+            int expCompressedWalSegments = PAYLOAD_SIZE * ENTRIES * 4 / WAL_SEGMENT_SIZE - 1;
+
+            String nodeFolderName = ignite.context().pdsFolderResolver().resolveFolders().folderName();
+
+            File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+            File walDir = new File(dbDir, "wal");
+            File archiveDir = new File(walDir, "archive");
+            File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+
+            File[] compressedSegments = nodeArchiveDir.listFiles(new FilenameFilter() {
+                @Override public boolean accept(File dir, String name) {
+                    return name.endsWith(".wal.zip");
+                }
+            });
+
+            final int actualCompressedWalSegments = compressedSegments == null ? 0 : compressedSegments.length;
+
+            assertTrue("expected=" + expCompressedWalSegments + ", actual=" + actualCompressedWalSegments,
+                actualCompressedWalSegments >= expCompressedWalSegments);
+
+            stopAllGrids();
+
+            File nodeLfsDir = new File(dbDir, nodeFolderName);
+            File cpMarkersDir = new File(nodeLfsDir, "cp");
+
+            File[] cpMarkers = cpMarkersDir.listFiles();
+
+            assertNotNull(cpMarkers);
+            assertTrue(cpMarkers.length > 0);
+
+            File cacheDir = new File(nodeLfsDir, "cache-" + TEST_CACHE_NAME);
+            File[] partFiles = cacheDir.listFiles(new FilenameFilter() {
+                @Override public boolean accept(File dir, String name) {
+                    return name.startsWith("part");
+                }
+            });
+
+            assertNotNull(partFiles);
+            assertTrue(partFiles.length > 0);
+
+            // Enforce reading WAL from the very beginning at the next start.
+            for (File f : cpMarkers)
+                f.delete();
+
+            for (File f : partFiles)
+                f.delete();
+
+            ignite = startGrid(0);
+
+            ignite.active(true);
+
+            cache = ignite.cache(TEST_CACHE_NAME);
+
+            boolean fail = false;
+
+            // Check that all data is recovered from compacted WAL.
+            for (int i = 0; i < ENTRIES * 2; i++) {
+                byte[] arr = cache.get(i);
+
+                if (arr == null) {
+                    System.out.println(">>> Missing: " + i);
+
+                    fail = true;
+                }
+                else if (arr[i] != 1) {
+                    System.out.println(">>> Corrupted: " + i);
+
+                    fail = true;
+                }
+            }
+
+            assertFalse(fail);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** */
+    private static class PostStartupClosure implements IgniteInClosure<Ignite> {
+        /** {@inheritDoc} */
+        @Override public void apply(Ignite ignite) {
+            ignite.active(true);
+
+            CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>();
+            cacheCfg.setName(TEST_CACHE_NAME);
+            cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cacheCfg.setBackups(0);
+            cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+            IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheCfg);
+
+            for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total.
+                final byte[] val = new byte[20000];
+
+                ThreadLocalRandom.current().nextBytes(val);
+
+                val[i] = 1;
+
+                cache.put(i, val);
+            }
+        }
+    }
+
+    /** */
+    private static class ConfigurationClosure implements IgniteInClosure<IgniteConfiguration> {
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteConfiguration cfg) {
+            cfg.setLocalHost("127.0.0.1");
+
+            TcpDiscoverySpi disco = new TcpDiscoverySpi();
+            disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
+
+            cfg.setDiscoverySpi(disco);
+
+            cfg.setPeerClassLoadingEnabled(false);
+
+            DataStorageConfiguration memCfg = new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration().setPersistenceEnabled(true))
+                .setWalSegmentSize(WAL_SEGMENT_SIZE)
+                .setWalMode(WALMode.LOG_ONLY)
+                .setWalHistorySize(100);
+
+            cfg.setDataStorageConfiguration(memCfg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
index 351a0e7..20643d4 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.compatibility.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.compatibility.persistence.DummyPersistenceCompatibilityTest;
 import org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest;
+import org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest;
 
 /**
  * Compatibility tests basic test suite.
@@ -36,6 +37,8 @@ public class IgniteCompatibilityBasicTestSuite {
 
         suite.addTestSuite(FoldersReuseCompatibilityTest.class);
 
+        suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e0ace11..8e2298f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -716,6 +716,11 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
 
     /**
+     *
+     */
+    public static final String IGNITE_WAL_ARCHIVE_COMPACT_SKIP_DELTA_RECORD = "IGNITE_WAL_ARCHIVE_COMPACT_SKIP_DELTA_RECORD";
+
+    /**
      * Tasks stealing will be started if tasks queue size per data-streamer thread exceeds this threshold.
      * <p>
      * Default value is {@code 4}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 8202ef8..2c90398 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -145,6 +145,9 @@ public class DataStorageConfiguration implements Serializable {
     /** Default write throttling enabled. */
     public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false;
 
+    /** Default wal compaction enabled. */
+    public static final boolean DFLT_WAL_COMPACTION_ENABLED = false;
+
     /** Size of a memory chunk reserved for system cache initially. */
     private long sysRegionInitSize = DFLT_SYS_CACHE_INIT_SIZE;
 
@@ -243,6 +246,12 @@ public class DataStorageConfiguration implements Serializable {
     private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED;
 
     /**
+     * Flag to enable WAL compaction. If true, system filters and compresses WAL archive in background.
+     * Compressed WAL archive gets automatically decompressed on demand.
+     */
+    private boolean walCompactionEnabled = DFLT_WAL_COMPACTION_ENABLED;
+
+    /**
      * Initial size of a data region reserved for system cache.
      *
      * @return Size in bytes.
@@ -850,4 +859,22 @@ public class DataStorageConfiguration implements Serializable {
 
         return this;
     }
+
+    /**
+     * @return Flag indicating whether WAL compaction is enabled.
+     */
+    public boolean isWalCompactionEnabled() {
+        return walCompactionEnabled;
+    }
+
+    /**
+     * Sets flag indicating whether WAL compaction is enabled.
+     *
+     * @param walCompactionEnabled Wal compaction enabled flag.
+     */
+    public DataStorageConfiguration setWalCompactionEnabled(boolean walCompactionEnabled) {
+        this.walCompactionEnabled = walCompactionEnabled;
+
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index ce28ff2..42d9611 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -102,6 +102,14 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
     public int truncate(WALPointer ptr);
 
     /**
+     * Gives a hint to WAL manager to compact WAL until given pointer (exclusively).
+     * Compaction implies filtering out physical records and ZIP compression.
+     *
+     * @param ptr Pointer for which it is safe to compact the log.
+     */
+    public void allowCompressionUntil(WALPointer ptr);
+
+    /**
      * @return Total number of segments in the WAL archive.
      */
     public int walArchiveSegments();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
new file mode 100644
index 0000000..519e825
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
@@ -0,0 +1,31 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+
+/**
+ * Special type of WAL record. Shouldn't be stored in file.
+ * Returned by deserializer if next record is not matched by filter. Automatically handled by
+ * {@link AbstractWalRecordsIterator}.
+ */
+public class FilteredRecord extends WALRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java
new file mode 100644
index 0000000..448a32c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+
+/**
+ * Special type of WAL record. Shouldn't be stored in file.
+ * Contains complete binary representation of record in {@link #buf} and record position in {@link #pos}.
+ */
+public class MarshalledRecord extends WALRecord {
+    /** Type of marshalled record. */
+    private WALRecord.RecordType type;
+
+    /**
+     * Heap buffer with marshalled record bytes.
+     * Due to performance reasons accessible only by thread that performs WAL iteration and until next record is read.
+     */
+    private ByteBuffer buf;
+
+    /**
+     * @param type Type of marshalled record.
+     * @param pos WAL pointer to record.
+     * @param buf Reusable buffer with record data.
+     */
+    public MarshalledRecord(RecordType type, WALPointer pos, ByteBuffer buf) {
+        this.type = type;
+        this.buf = buf;
+
+        position(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return type;
+    }
+
+    /**
+     * @return Buffer with marshalled record bytes.  Due to performance reasons accessible only by thread that performs
+     * WAL iteration and until next record is read.
+     */
+    public ByteBuffer buffer() {
+        return buf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e07aef7..c0e59bc 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1447,6 +1447,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             cctx.pageStore().beginRecover();
         }
+        else
+            cctx.wal().allowCompressionUntil(status.startPtr);
 
         long start = U.currentTimeMillis();
         int applied = 0;
@@ -2973,6 +2975,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
 
             chp.walFilesDeleted = deleted;
+
+            if (!chp.cpPages.isEmpty())
+                cctx.wal().allowCompressionUntil(chp.cpEntry.checkpointMark());
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 5be6e55..7415db3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -30,7 +30,9 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -70,8 +72,8 @@ public abstract class AbstractWalRecordsIterator
      */
     @NotNull protected final GridCacheSharedContext sharedCtx;
 
-    /** Serializer of current version to read headers. */
-    @NotNull private final RecordSerializer serializer;
+    /** Serializer factory. */
+    @NotNull private final RecordSerializerFactory serializerFactory;
 
     /** Factory to provide I/O interfaces for read/write operations with files */
     @NotNull protected final FileIOFactory ioFactory;
@@ -82,20 +84,20 @@ public abstract class AbstractWalRecordsIterator
     /**
      * @param log Logger.
      * @param sharedCtx Shared context.
-     * @param serializer Serializer of current version to read headers.
+     * @param serializerFactory Serializer of current version to read headers.
      * @param ioFactory ioFactory for file IO access.
      * @param bufSize buffer for reading records size.
      */
     protected AbstractWalRecordsIterator(
         @NotNull final IgniteLogger log,
         @NotNull final GridCacheSharedContext sharedCtx,
-        @NotNull final RecordSerializer serializer,
+        @NotNull final RecordSerializerFactory serializerFactory,
         @NotNull final FileIOFactory ioFactory,
         final int bufSize
     ) {
         this.log = log;
         this.sharedCtx = sharedCtx;
-        this.serializer = serializer;
+        this.serializerFactory = serializerFactory;
         this.ioFactory = ioFactory;
 
         buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
@@ -107,7 +109,7 @@ public abstract class AbstractWalRecordsIterator
      * @return found WAL file descriptors
      */
     protected static FileWriteAheadLogManager.FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException {
-        final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER);
+        final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
 
         if (files == null) {
             throw new IgniteCheckedException("WAL files directory does not not denote a " +
@@ -156,8 +158,12 @@ public abstract class AbstractWalRecordsIterator
             try {
                 curRec = advanceRecord(currWalSegment);
 
-                if (curRec != null)
+                if (curRec != null) {
+                    if (curRec.get2().type() == null)
+                        continue; // Record was skipped by filter of current serializer, should read next record.
+
                     return;
+                }
                 else {
                     currWalSegment = advanceSegment(currWalSegment);
 
@@ -275,20 +281,31 @@ public abstract class AbstractWalRecordsIterator
             FileIO fileIO = ioFactory.create(desc.file);
 
             try {
-                int serVer = FileWriteAheadLogManager.readSerializerVersion(fileIO);
+                IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO);
 
-                RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, serVer);
+                int serVer = tup.get1();
+
+                boolean isCompacted = tup.get2();
 
                 FileInput in = new FileInput(fileIO, buf);
 
                 if (start != null && desc.idx == start.index()) {
-                    // Make sure we skip header with serializer version.
-                    long startOffset = Math.max(start.fileOffset(), fileIO.position());
-
-                    in.seek(startOffset);
+                    if (isCompacted) {
+                        serializerFactory.skipPositionCheck(true);
+
+                        if (start.fileOffset() != 0)
+                            serializerFactory.recordDeserializeFilter(new StartSeekingFilter(start));
+                    }
+                    else {
+                        // Make sure we skip header with serializer version.
+                        long startOff = Math.max(start.fileOffset(), fileIO.position());
+
+                        in.seek(startOff);
+                    }
                 }
 
-                return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
+                return new FileWriteAheadLogManager.ReadFileHandle(
+                    fileIO, desc.idx, sharedCtx.igniteInstanceName(), serializerFactory.createSerializer(serVer), in);
             }
             catch (SegmentEofException | EOFException ignore) {
                 try {
@@ -320,4 +337,32 @@ public abstract class AbstractWalRecordsIterator
         }
     }
 
+    /**
+     * Filter that drops all records until given start pointer is reached.
+     */
+    private static class StartSeekingFilter implements P2<WALRecord.RecordType, WALPointer> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Start pointer. */
+        private final FileWALPointer start;
+
+        /** Start reached flag. */
+        private boolean startReached;
+
+        /**
+         * @param start Start.
+         */
+        StartSeekingFilter(FileWALPointer start) {
+            this.start = start;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(WALRecord.RecordType type, WALPointer pointer) {
+            if (start.fileOffset() == ((FileWALPointer)pointer).fileOffset())
+                startReached = true;
+
+            return startReached;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index 3b20fce..303a023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -348,7 +348,13 @@ public final class FileInput implements ByteBufferBackedDataInput {
 
         /** {@inheritDoc} */
         @Override public int skipBytes(int n) throws IOException {
-            throw new UnsupportedOperationException();
+            ensure(n);
+
+            int skipped = Math.min(buf.remaining(), n);
+
+            buf.position(buf.position() + skipped);
+
+            return skipped;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 3d8d78f..a450521 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -31,6 +35,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -40,6 +46,9 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -49,6 +58,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -57,24 +67,28 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -123,8 +137,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
     };
 
+    /** */
+    private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
+
+    /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
+    public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
+                WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
+        }
+    };
+
+    /** */
+    private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
+
+    /** */
+    private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
+        }
+    };
+
+    /** */
+    private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
+        }
+    };
+
     /** Latest serializer version to use. */
-    public static final int LATEST_SERIALIZER_VERSION = 1;
+    private static final int LATEST_SERIALIZER_VERSION = 2;
 
     /** */
     private final boolean alwaysWriteFullPages;
@@ -169,8 +211,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private final int serializerVersion =
         IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
 
-    /** */
-    private volatile long oldestArchiveSegmentIdx;
+    /** Latest segment cleared by {@link #truncate(WALPointer)}. */
+    private volatile long lastTruncatedArchiveIdx = -1L;
 
     /** Factory to provide I/O interfaces for read/write operations with files */
     private final FileIOFactory ioFactory;
@@ -197,6 +239,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private volatile FileArchiver archiver;
 
+    /** Compressor. */
+    private volatile FileCompressor compressor;
+
+    /** Decompressor. */
+    private volatile FileDecompressor decompressor;
+
     /** */
     private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
 
@@ -276,7 +324,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 "write ahead log archive directory"
             );
 
-            serializer = forVersion(cctx, serializerVersion);
+            serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion);
 
             GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
 
@@ -286,10 +334,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
 
-            oldestArchiveSegmentIdx = tup == null ? 0 : tup.get1();
+            lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
 
             archiver = new FileArchiver(tup == null ? -1 : tup.get2());
 
+            if (dsCfg.isWalCompactionEnabled()) {
+                compressor = new FileCompressor();
+
+                decompressor = new FileDecompressor();
+            }
+
             if (mode != WALMode.NONE) {
                 if (log.isInfoEnabled())
                     log.info("Started write-ahead log manager [mode=" + mode + ']');
@@ -338,6 +392,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             if (archiver != null)
                 archiver.shutdown();
+
+            if (compressor != null)
+                compressor.shutdown();
+
+            if (decompressor != null)
+                decompressor.shutdown();
         }
         catch (Exception e) {
             U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e);
@@ -354,8 +414,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         if (!cctx.kernalContext().clientNode()) {
             assert archiver != null;
-
             archiver.start();
+
+            if (compressor != null)
+                compressor.start();
+
+            if (decompressor != null)
+                decompressor.start();
         }
     }
 
@@ -575,11 +640,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             (FileWALPointer)start,
             end,
             dsCfg,
-            serializer,
+            new RecordSerializerFactoryImpl(cctx),
             ioFactory,
             archiver,
-            log,
-            tlbSize
+            decompressor,
+            log
         );
     }
 
@@ -626,9 +691,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @return {@code true} if has this index.
      */
     private boolean hasIndex(long absIdx) {
-        String name = FileDescriptor.fileName(absIdx);
+        String segmentName = FileDescriptor.fileName(absIdx);
+
+        String zipSegmentName = FileDescriptor.fileName(absIdx) + ".zip";
 
-        boolean inArchive = new File(walArchiveDir, name).exists();
+        boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
+            new File(walArchiveDir, zipSegmentName).exists();
 
         if (inArchive)
             return true;
@@ -651,7 +719,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         // File pointer bound: older entries will be deleted from archive
         FileWALPointer fPtr = (FileWALPointer)ptr;
 
-        FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+        FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
 
         int deleted = 0;
 
@@ -671,8 +739,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     deleted++;
 
                 // Bump up the oldest archive segment index.
-                if (oldestArchiveSegmentIdx < desc.idx)
-                    oldestArchiveSegmentIdx = desc.idx;
+                if (lastTruncatedArchiveIdx < desc.idx)
+                    lastTruncatedArchiveIdx = desc.idx;
             }
         }
 
@@ -680,15 +748,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /** {@inheritDoc} */
+    @Override public void allowCompressionUntil(WALPointer ptr) {
+        if (compressor != null)
+            compressor.allowCompressionUntil(((FileWALPointer)ptr).index());
+    }
+
+    /** {@inheritDoc} */
     @Override public int walArchiveSegments() {
-        long oldest = oldestArchiveSegmentIdx;
+        long lastTruncated = lastTruncatedArchiveIdx;
 
         long lastArchived = archiver.lastArchivedAbsoluteIndex();
 
         if (lastArchived == -1)
             return 0;
 
-        int res = (int)(lastArchived - oldest);
+        int res = (int)(lastArchived - lastTruncated);
 
         return res >= 0 ? res : 0;
     }
@@ -710,7 +784,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private long lastArchivedIndex() {
         long lastIdx = -1;
 
-        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
+        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
             try {
                 long idx = Long.parseLong(file.getName().substring(0, 16));
 
@@ -725,27 +799,42 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Lists files in archive directory and returns the index of last archived file.
+     * Lists files in archive directory and returns the indices of least and last archived files.
+     * In case of holes, first segment after last "hole" is considered as minimum.
+     * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
      *
-     * @return The absolute index of last archived file.
+     * @return The absolute indices of min and max archived files.
      */
     private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
-        long minIdx = Integer.MAX_VALUE;
-        long maxIdx = -1;
+        TreeSet<Long> archiveIndices = new TreeSet<>();
 
-        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
+        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
             try {
                 long idx = Long.parseLong(file.getName().substring(0, 16));
 
-                minIdx = Math.min(minIdx, idx);
-                maxIdx = Math.max(maxIdx, idx);
+                archiveIndices.add(idx);
             }
             catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-
+                // No-op.
             }
         }
 
-        return maxIdx == -1 ? null : F.t(minIdx, maxIdx);
+        if (archiveIndices.isEmpty())
+            return null;
+        else {
+            Long min = archiveIndices.first();
+            Long max = archiveIndices.last();
+
+            if (max - min == archiveIndices.size() - 1)
+                return F.t(min, max); // Short path.
+
+            for (Long idx : archiveIndices.descendingSet()) {
+                if (!archiveIndices.contains(idx - 1))
+                    return F.t(idx, max);
+            }
+
+            throw new IllegalStateException("Should never happen if TreeSet is valid.");
+        }
     }
 
     /**
@@ -836,14 +925,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 // If we have existing segment, try to read version from it.
                 if (lastReadPtr != null) {
                     try {
-                        serVer = readSerializerVersion(fileIO);
+                        serVer = readSerializerVersionAndCompactedFlag(fileIO).get1();
                     }
                     catch (SegmentEofException | EOFException ignore) {
                         serVer = serializerVersion;
                     }
                 }
 
-                RecordSerializer ser = forVersion(cctx, serVer);
+                RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
 
                 if (log.isInfoEnabled())
                     log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() +
@@ -1021,37 +1110,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
     }
 
-    /**
-     * @param cctx Shared context.
-     * @param ver Serializer version.
-     * @return Entry serializer.
-     */
-    public static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
-        return forVersion(cctx, ver, false);
-    }
-
-    /**
-     * @param ver Serializer version.
-     * @return Entry serializer.
-     */
-    static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver, boolean writePointer) throws IgniteCheckedException {
-        if (ver <= 0)
-            throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file).");
-
-        switch (ver) {
-            case 1:
-                return new RecordV1Serializer(new RecordDataV1Serializer(cctx), writePointer);
-
-            case 2:
-                RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
-
-                return new RecordV2Serializer(dataV2Serializer, writePointer);
-
-            default:
-                throw new IgniteCheckedException("Failed to create a serializer with the given version " +
-                    "(forward compatibility is not supported): " + ver);
-        }
-    }
 
     /**
      * @return Sorted WAL files descriptors.
@@ -1103,13 +1161,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         private IgniteCheckedException cleanException;
 
         /**
-         * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>.
+         * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>.
          * Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
          */
         private long curAbsWalIdx = -1;
 
         /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
-        private long lastAbsArchivedIdx = -1;
+        private volatile long lastAbsArchivedIdx = -1;
 
         /** current thread stopping advice */
         private volatile boolean stopped;
@@ -1135,7 +1193,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * @return Last archived segment absolute index.
          */
-        private synchronized long lastArchivedAbsoluteIndex() {
+        private long lastArchivedAbsoluteIndex() {
             return lastAbsArchivedIdx;
         }
 
@@ -1221,7 +1279,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         wait();
 
                     if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1)
-                        lastAbsArchivedIdx = curAbsWalIdx - 1;
+                        changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1);
                 }
 
                 while (!Thread.currentThread().isInterrupted() && !stopped) {
@@ -1252,7 +1310,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                                 formatFile(res.getOrigWorkFile());
 
                             // Then increase counter to allow rollover on clean working file
-                            lastAbsArchivedIdx = toArchive;
+                            changeLastArchivedIndexAndWakeupCompressor(toArchive);
 
                             notifyAll();
                         }
@@ -1275,6 +1333,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
+         * @param idx Index.
+         */
+        private void changeLastArchivedIndexAndWakeupCompressor(long idx) {
+            lastAbsArchivedIdx = idx;
+
+            if (compressor != null)
+                compressor.onNextSegmentArchived();
+        }
+
+        /**
          * Gets the absolute index of the next WAL segment available to write.
          * Blocks till there are available file to write
          *
@@ -1301,7 +1369,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     return curAbsWalIdx;
                 }
             }
-            catch (InterruptedException e) {
+           catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
 
                 throw new IgniteInterruptedCheckedException(e);
@@ -1426,6 +1494,314 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Responsible for compressing WAL archive segments.
+     * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved.
+     */
+    private class FileCompressor extends Thread {
+        /** Current thread stopping advice. */
+        private volatile boolean stopped;
+
+        /** Last successfully compressed segment. */
+        private volatile long lastCompressedIdx = -1L;
+
+        /** All segments prior to this (inclusive) can be compressed. */
+        private volatile long lastAllowedToCompressIdx = -1L;
+
+        /**
+         *
+         */
+        FileCompressor() {
+            super("wal-file-compressor%" + cctx.igniteInstanceName());
+        }
+
+        /**
+         *
+         */
+        private void init() {
+            File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
+
+            for (File f : toDel) {
+                if (stopped)
+                    return;
+
+                f.delete();
+            }
+
+            FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+
+            if (alreadyCompressed.length > 0)
+                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].getIdx();
+        }
+
+        /**
+         * @param lastCpStartIdx Segment index to allow compression until (exclusively).
+         */
+        synchronized void allowCompressionUntil(long lastCpStartIdx) {
+            lastAllowedToCompressIdx = lastCpStartIdx - 1;
+
+            notify();
+        }
+
+        /**
+         * Callback for waking up compressor when new segment is archived.
+         */
+        synchronized void onNextSegmentArchived() {
+            notify();
+        }
+
+        /**
+         * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation.
+         * Waits if there's no segment to archive right now.
+         */
+        private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException {
+            long segmentToCompress = lastCompressedIdx + 1;
+
+            synchronized (this) {
+                while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex())) {
+                    wait();
+
+                    if (stopped)
+                        return -1;
+                }
+            }
+
+            segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1);
+
+            boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
+
+            return reserved ? segmentToCompress : -1;
+        }
+
+        /**
+         *
+         */
+        private void deleteObsoleteRawSegments() {
+            FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+            FileArchiver archiver0 = archiver;
+
+            for (FileDescriptor desc : descs) {
+                // Do not delete reserved or locked segment and any segment after it.
+                if (archiver0 != null && archiver0.reserved(desc.idx))
+                    return;
+
+                if (desc.idx < lastCompressedIdx) {
+                    if (!desc.file.delete())
+                        U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
+                            desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            init();
+
+            while (!Thread.currentThread().isInterrupted() && !stopped) {
+                try {
+                    deleteObsoleteRawSegments();
+
+                    long nextSegment = tryReserveNextSegmentOrWait();
+                    if (nextSegment == -1)
+                        continue;
+
+                    File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp");
+
+                    File zip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip");
+
+                    File raw = new File(walArchiveDir, FileDescriptor.fileName(nextSegment));
+                    if (!Files.exists(raw.toPath()))
+                        throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
+
+                    compressSegmentToFile(nextSegment, raw, tmpZip);
+
+                    Files.move(tmpZip.toPath(), zip.toPath());
+
+                    if (mode == WALMode.DEFAULT) {
+                        try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
+                            f0.force();
+                        }
+                    }
+
+                    lastCompressedIdx = nextSegment;
+                }
+                catch (IgniteCheckedException | IOException e) {
+                    U.error(log, "Unexpected error during WAL compression", e);
+
+                    FileWriteHandle handle = currentHandle();
+
+                    if (handle != null)
+                        handle.invalidateEnvironment(e);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        /**
+         * @param nextSegment Next segment absolute idx.
+         * @param raw Raw file.
+         * @param zip Zip file.
+         */
+        private void compressSegmentToFile(long nextSegment, File raw, File zip)
+            throws IOException, IgniteCheckedException {
+            int segmentSerializerVer;
+
+            try (FileIO fileIO = ioFactory.create(raw)) {
+                IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO);
+
+                segmentSerializerVer = tup.get1();
+            }
+
+            try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) {
+                zos.putNextEntry(new ZipEntry(""));
+
+                zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array());
+
+                final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
+                    @Override public void applyx(WALRecord record) throws IgniteCheckedException {
+                        final MarshalledRecord marshRec = (MarshalledRecord)record;
+
+                        try {
+                            zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining());
+                        }
+                        catch (IOException e) {
+                            throw new IgniteCheckedException(e);
+                        }
+                    }
+                };
+
+                try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator(
+                    log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) {
+
+                    while (iter.hasNextX())
+                        iter.nextX();
+                }
+            }
+            finally {
+                release(new FileWALPointer(nextSegment, 0, 0));
+            }
+        }
+
+        /**
+         * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
+         */
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                stopped = true;
+
+                notifyAll();
+            }
+
+            U.join(this);
+        }
+    }
+
+    /**
+     * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
+     */
+    private class FileDecompressor extends Thread {
+        /** Current thread stopping advice. */
+        private volatile boolean stopped;
+
+        /** Decompression futures. */
+        private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
+
+        /** Segments queue. */
+        private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>();
+
+        /** Byte array for draining data. */
+        private byte[] arr = new byte[tlbSize];
+
+        /**
+         *
+         */
+        FileDecompressor() {
+            super("wal-file-decompressor%" + cctx.igniteInstanceName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (!Thread.currentThread().isInterrupted() && !stopped) {
+                try {
+                    long segmentToDecompress = segmentsQueue.take();
+
+                    if (stopped)
+                        break;
+
+                    File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip");
+                    File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp");
+                    File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
+
+                    try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
+                        FileIO io = ioFactory.create(unzipTmp)) {
+                        zis.getNextEntry();
+
+                        int bytesRead;
+                        while ((bytesRead = zis.read(arr)) > 0)
+                            io.write(arr, 0, bytesRead);
+                    }
+
+                    Files.move(unzipTmp.toPath(), unzip.toPath());
+
+                    synchronized (this) {
+                        decompressionFutures.remove(segmentToDecompress).onDone();
+                    }
+                }
+                catch (InterruptedException e){
+                    Thread.currentThread().interrupt();
+                }
+                catch (IOException e) {
+                    U.error(log, "Unexpected error during WAL decompression", e);
+
+                    FileWriteHandle handle = currentHandle();
+
+                    if (handle != null)
+                        handle.invalidateEnvironment(e);
+                }
+            }
+        }
+
+        /**
+         * Asynchronously decompresses WAL segment which is present only in .zip file.
+         *
+         * @return Future which is completed once file is decompressed.
+         */
+        synchronized IgniteInternalFuture<Void> decompressFile(long idx) {
+            if (decompressionFutures.containsKey(idx))
+                return decompressionFutures.get(idx);
+
+            File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
+
+            if (f.exists())
+                return new GridFinishedFuture<>();
+
+            segmentsQueue.put(idx);
+
+            GridFutureAdapter<Void> res = new GridFutureAdapter<>();
+
+            decompressionFutures.put(idx, res);
+
+            return res;
+        }
+
+        /**
+         * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
+         */
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                stopped = true;
+
+                // Put fake -1 to wake thread from queue.take()
+                segmentsQueue.put(-1L);
+            }
+
+            U.join(this);
+        }
+    }
+
+    /**
      * Validate files depending on {@link DataStorageConfiguration#getWalSegments()}  and create if need.
      * Check end when exit condition return false or all files are passed.
      *
@@ -1452,14 +1828,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Reads record serializer version from provided {@code io}.
+     * Reads record serializer version from provided {@code io} along with compacted flag.
      * NOTE: Method mutates position of {@code io}.
      *
      * @param io I/O interface for file.
      * @return Serializer version stored in the file.
      * @throws IgniteCheckedException If failed to read serializer version.
      */
-    public static int readSerializerVersion(FileIO io)
+    public static IgniteBiTuple<Integer, Boolean> readSerializerVersionAndCompactedFlag(FileIO io)
             throws IgniteCheckedException, IOException {
         try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
             FileInput in = new FileInput(io, buf);
@@ -1481,19 +1857,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr;
 
-            long headerMagicNumber = in.readLong();
+            long hdrMagicNum = in.readLong();
 
-            if (headerMagicNumber != HeaderRecord.MAGIC)
-                throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
-                        ", actual=" + U.hexLong(headerMagicNumber) + ']');
+            boolean compacted;
+            if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC)
+                compacted = false;
+            else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC)
+                compacted = true;
+            else {
+                throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.REGULAR_MAGIC) +
+                    ", actual=" + U.hexLong(hdrMagicNum) + ']');
+            }
 
             // Read serializer version.
-            int version = in.readInt();
+            int ver = in.readInt();
 
             // Read and skip CRC.
             in.readInt();
 
-            return version;
+            return new IgniteBiTuple<>(ver, compacted);
         }
     }
 
@@ -1508,47 +1890,58 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @throws IOException If failed to write serializer version.
      */
     public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException {
-        ByteBuffer buffer = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
-        buffer.order(ByteOrder.nativeOrder());
+        ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false);
+
+        do {
+            io.write(buffer);
+        }
+        while (buffer.hasRemaining());
+
+        // Flush
+        io.force();
+
+        return io.position();
+    }
+
+    /**
+     * @param idx Index.
+     * @param ver Version.
+     * @param compacted Compacted flag.
+     */
+    @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) {
+        ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+        buf.order(ByteOrder.nativeOrder());
 
         // Write record type.
-        buffer.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
+        buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
 
         // Write position.
-        RecordV1Serializer.putPosition(buffer, new FileWALPointer(idx, 0, 0));
+        RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0));
 
         // Place magic number.
-        buffer.putLong(HeaderRecord.MAGIC);
+        buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC);
 
         // Place serializer version.
-        buffer.putInt(version);
+        buf.putInt(ver);
 
         // Place CRC if needed.
         if (!RecordV1Serializer.SKIP_CRC) {
-            int curPos = buffer.position();
+            int curPos = buf.position();
 
-            buffer.position(0);
+            buf.position(0);
 
             // This call will move buffer position to the end of the record again.
-            int crcVal = PureJavaCrc32.calcCrc32(buffer, curPos);
+            int crcVal = PureJavaCrc32.calcCrc32(buf, curPos);
 
-            buffer.putInt(crcVal);
+            buf.putInt(crcVal);
         }
         else
-            buffer.putInt(0);
+            buf.putInt(0);
 
         // Write header record through io.
-        buffer.position(0);
-
-        do {
-            io.write(buffer);
-        }
-        while (buffer.hasRemaining());
+        buf.position(0);
 
-        // Flush
-        io.force();
-
-        return io.position();
+        return buf;
     }
 
     /**
@@ -1579,11 +1972,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             String fileName = file.getName();
 
-            assert fileName.endsWith(WAL_SEGMENT_FILE_EXT);
-
-            int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length();
+            assert fileName.contains(WAL_SEGMENT_FILE_EXT);
 
-            this.idx = idx == null ? Long.parseLong(fileName.substring(0, end)) : idx;
+            this.idx = idx == null ? Long.parseLong(fileName.substring(0, 16)) : idx;
         }
 
         /**
@@ -2193,17 +2584,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     assert stopped() : "Segment is not closed after close flush: " + head.get();
 
                     try {
-                        int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE + RecordV1Serializer.CRC_SIZE;
+                        RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+                            .createSerializer(serializerVersion);
 
-                    if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
-                        RecordV1Serializer backwardSerializer =
-                            new RecordV1Serializer(new RecordDataV1Serializer(cctx), true);
+                        SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
 
-                        final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+                        int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
 
-                        SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
-                        segmentRecord.position( new FileWALPointer(idx, (int)written, -1));
-                        backwardSerializer.writeRecord(segmentRecord,buf);
+                        if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+                            final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+
+                            segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize));
+                            backwardSerializer.writeRecord(segmentRecord, buf);
 
                             buf.rewind();
 
@@ -2487,6 +2879,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         private final FileArchiver archiver;
 
         /** */
+        private final FileDecompressor decompressor;
+
+        /** */
         private final DataStorageConfiguration psCfg;
 
         /** Optional start pointer. */
@@ -2504,10 +2899,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param start Optional start pointer.
          * @param end Optional end pointer.
          * @param psCfg Database configuration.
-         * @param serializer Serializer of current version to read headers.
+         * @param serializerFactory Serializer factory.
          * @param archiver Archiver.
-         * @param log Logger
-         * @throws IgniteCheckedException If failed to initialize WAL segment.
+         * @param decompressor Decompressor.
+         *@param log Logger  @throws IgniteCheckedException If failed to initialize WAL segment.
          */
         private RecordsIterator(
             GridCacheSharedContext cctx,
@@ -2516,15 +2911,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             @Nullable FileWALPointer start,
             @Nullable FileWALPointer end,
             DataStorageConfiguration psCfg,
-            @NotNull RecordSerializer serializer,
+            @NotNull RecordSerializerFactory serializerFactory,
             FileIOFactory ioFactory,
             FileArchiver archiver,
-            IgniteLogger log,
-            int tlbSize
+            FileDecompressor decompressor,
+            IgniteLogger log
         ) throws IgniteCheckedException {
             super(log,
                 cctx,
-                serializer,
+                serializerFactory,
                 ioFactory,
                 psCfg.getWalRecordIteratorBufferSize());
             this.walWorkDir = walWorkDir;
@@ -2533,6 +2928,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             this.archiver = archiver;
             this.start = start;
             this.end = end;
+            this.decompressor = decompressor;
 
             init();
 
@@ -2540,6 +2936,26 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /** {@inheritDoc} */
+        @Override protected ReadFileHandle initReadHandle(
+            @NotNull FileDescriptor desc,
+            @Nullable FileWALPointer start
+        ) throws IgniteCheckedException, FileNotFoundException {
+            if (decompressor != null && !desc.file.exists()) {
+                FileDescriptor zipFile = new FileDescriptor(
+                    new File(walArchiveDir, FileDescriptor.fileName(desc.getIdx()) + ".zip"));
+
+                if (!zipFile.file.exists()) {
+                    throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " +
+                        "[segmentIdx=" + desc.idx + "]");
+                }
+
+                decompressor.decompressFile(desc.idx).get();
+            }
+
+            return super.initReadHandle(desc, start);
+        }
+
+        /** {@inheritDoc} */
         @Override protected void onClose() throws IgniteCheckedException {
             super.onClose();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
deleted file mode 100644
index 5a14095..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-
-/**
- * Interface to provide size, read and write operations with WAL records
- * <b>without any headers and meta information</b>.
- */
-public interface RecordDataSerializer {
-    /**
-     * Calculates size of record data.
-     *
-     * @param record WAL record.
-     * @return Size of record in bytes.
-     * @throws IgniteCheckedException If it's unable to calculate record data size.
-     */
-    int size(WALRecord record) throws IgniteCheckedException;
-
-    /**
-     * Reads record data of {@code type} from buffer {@code in}.
-     *
-     * @param type Record type.
-     * @param in Buffer to read.
-     * @return WAL record.
-     * @throws IOException In case of I/O problems.
-     * @throws IgniteCheckedException If it's unable to read record.
-     */
-    WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
-
-    /**
-     * Writes record data to buffer {@code buf}.
-     *
-     * @param record WAL record.
-     * @param buf Buffer to write.
-     * @throws IgniteCheckedException If it's unable to write record.
-     */
-    void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
deleted file mode 100644
index 12e16a8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-
-/**
- * Record serializer.
- */
-public interface RecordSerializer {
-    /**
-     * @return serializer version
-     */
-    public int version();
-
-    /**
-     * Calculates record size in byte including expected wal pointer, CRC and type field
-     *
-     * @param record Record.
-     * @return Size in bytes.
-     */
-    public int size(WALRecord record) throws IgniteCheckedException;
-
-    /**
-     * @param record Entry to write.
-     * @param buf Buffer.
-     */
-    public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
-
-    /**
-     * Loads record from input
-     *
-     * @param in Data input to read data from.
-     * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
-     * @return Read entry.
-     */
-    public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
-
-    /**
-     * Flag to write (or not) wal pointer to record
-     */
-    public boolean writePointer();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
new file mode 100644
index 0000000..4a846b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.RecordTypes;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.P2;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Iterates over logical records of one WAL segment from archive. Used for WAL archive compression.
+ * Doesn't deserialize actual record data, returns {@link MarshalledRecord} instances instead.
+ */
+public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsIterator {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Segment initialized flag. */
+    private boolean segmentInitialized;
+
+    /** Archived segment index. */
+    private long archivedSegIdx;
+
+    /** Archive directory. */
+    private File archiveDir;
+
+    /** Closure which is executed right after advance. */
+    private CIX1<WALRecord> advanceC;
+
+    /**
+     * @param log Logger.
+     * @param sharedCtx Shared context.
+     * @param ioFactory Io factory.
+     * @param bufSize Buffer size.
+     * @param archivedSegIdx Archived seg index.
+     * @param archiveDir Directory with segment.
+     * @param advanceC Closure which is executed right after advance.
+     */
+    SingleSegmentLogicalRecordsIterator(
+        @NotNull IgniteLogger log,
+        @NotNull GridCacheSharedContext sharedCtx,
+        @NotNull FileIOFactory ioFactory,
+        int bufSize,
+        long archivedSegIdx,
+        File archiveDir,
+        CIX1<WALRecord> advanceC
+    ) throws IgniteCheckedException {
+        super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize);
+
+        this.archivedSegIdx = archivedSegIdx;
+        this.archiveDir = archiveDir;
+        this.advanceC = advanceC;
+
+        advance();
+    }
+
+    /**
+     * @param sharedCtx Shared context.
+     */
+    private static RecordSerializerFactory initLogicalRecordsSerializerFactory(GridCacheSharedContext sharedCtx)
+        throws IgniteCheckedException {
+
+        return new RecordSerializerFactoryImpl(sharedCtx)
+            .recordDeserializeFilter(new LogicalRecordsFilter())
+            .marshalledMode(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FileWriteAheadLogManager.ReadFileHandle advanceSegment(
+        @Nullable FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException {
+        if (segmentInitialized) {
+            closeCurrentWalSegment();
+            // No advance as we iterate over single segment.
+            return null;
+        }
+        else {
+            segmentInitialized = true;
+
+            FileWriteAheadLogManager.FileDescriptor fd = new FileWriteAheadLogManager.FileDescriptor(
+                new File(archiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(archivedSegIdx)));
+
+            try {
+                return initReadHandle(fd, null);
+            }
+            catch (FileNotFoundException e) {
+                throw new IgniteCheckedException("Missing WAL segment in the archive", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void advance() throws IgniteCheckedException {
+        super.advance();
+
+        if (curRec != null && advanceC != null)
+            advanceC.apply(curRec.get2());
+    }
+
+    /**
+     *
+     */
+    private static class LogicalRecordsFilter implements P2<WALRecord.RecordType, WALPointer> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Records type to skip. */
+        private final Set<WALRecord.RecordType> skip = RecordTypes.DELTA_TYPE_SET;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(WALRecord.RecordType type, WALPointer ptr) {
+            return !skip.contains(type);
+        }
+    }
+}