You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/27 13:28:03 UTC

[06/13] ignite git commit: ignite-5938 WAL logs compaction and compression after checkpoint

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index a4d9e95..b36c2db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExp
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.F;
@@ -108,7 +109,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
     ) throws IgniteCheckedException {
         super(log,
             sharedCtx,
-            FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
+            new RecordSerializerFactoryImpl(sharedCtx),
             ioFactory,
             BUF_SIZE);
         this.keepBinary = keepBinary;
@@ -136,7 +137,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
             @NotNull File... walFiles) throws IgniteCheckedException {
         super(log,
             sharedCtx,
-            FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
+            new RecordSerializerFactoryImpl(sharedCtx),
             ioFactory,
             BUF_SIZE);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
index 4fa6232..35c94a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
@@ -24,8 +24,11 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  * Header record.
  */
 public class HeaderRecord extends WALRecord {
-    /** */
-    public static final long MAGIC = 0xB0D045A_CE7ED045AL;
+    /** Magic of regular WAL segment. */
+    public static final long REGULAR_MAGIC = 0xB0D045A_CE7ED045AL;
+
+    /** Magic of WAL segment with skipped physical records. */
+    public static final long COMPACTED_MAGIC = 0x4E07AE0_E573A694EL;
 
     /** Serializer version */
     private final int ver;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
new file mode 100644
index 0000000..9654748
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.record;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+
+/**
+ * Utility class for handling WAL record types.
+ */
+public final class RecordTypes {
+    /** */
+    public static final Set<WALRecord.RecordType> DELTA_TYPE_SET = new HashSet<>();
+
+    static {
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGE_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.INIT_NEW_PAGE_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_FRAGMENT_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_REMOVE_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_ADD_ROOT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_CUT_ROOT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_INIT_NEW_ROOT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_RECYCLE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_INSERT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_LEFTMOST_CHILD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_COUNT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_REPLACE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_REMOVE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_INNER_REPLACE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_REMOVE_ID);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FORWARD_PAGE_SPLIT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_EXISTING_PAGE_SPLIT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_MERGE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_SET_NEXT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_SET_PREVIOUS);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_INIT_NEW_PAGE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_ADD_PAGE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_REMOVE_PAGE);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_INIT);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PARTITION_META_PAGE_UPDATE_COUNTERS);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.TRACKING_PAGE_DELTA);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_NEXT_SNAPSHOT_ID);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_ALLOCATED_INDEX);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.PAGE_LIST_META_RESET_COUNT_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_UPDATE_RECORD);
+        DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
new file mode 100644
index 0000000..88c5f42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+
+/**
+ * Interface to provide size, read and write operations with WAL records
+ * <b>without any headers and meta information</b>.
+ */
+public interface RecordDataSerializer {
+    /**
+     * Calculates size of record data.
+     *
+     * @param record WAL record.
+     * @return Size of record in bytes.
+     * @throws IgniteCheckedException If it's unable to calculate record data size.
+     */
+    int size(WALRecord record) throws IgniteCheckedException;
+
+    /**
+     * Reads record data of {@code type} from buffer {@code in}.
+     *
+     * @param type Record type.
+     * @param in Buffer to read.
+     * @return WAL record.
+     * @throws IOException In case of I/O problems.
+     * @throws IgniteCheckedException If it's unable to read record.
+     */
+    WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
+
+    /**
+     * Writes record data to buffer {@code buf}.
+     *
+     * @param record WAL record.
+     * @param buf Buffer to write.
+     * @throws IgniteCheckedException If it's unable to write record.
+     */
+    void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index e583df3..d478917 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -87,7 +87,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -283,8 +282,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 return /*cacheId*/ 4 + /*pageId*/ 8;
 
             case SWITCH_SEGMENT_RECORD:
-                // CRC is not loaded for switch segment.
-                return -CRC_SIZE;
+                return 0;
 
             case TX_RECORD:
                 return txRecordSerializer.sizeOfTxRecord((TxRecord)record);
@@ -391,9 +389,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
             case HEADER_RECORD:
                 long magic = in.readLong();
 
-                if (magic != HeaderRecord.MAGIC)
-                    throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
-                            ", actual=" + U.hexLong(magic) + ']');
+                if (magic != HeaderRecord.REGULAR_MAGIC && magic != HeaderRecord.COMPACTED_MAGIC)
+                    throw new EOFException("Magic is corrupted [actual=" + U.hexLong(magic) + ']');
 
                 int ver = in.readInt();
 
@@ -912,7 +909,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case HEADER_RECORD:
-                buf.putLong(HeaderRecord.MAGIC);
+                buf.putLong(HeaderRecord.REGULAR_MAGIC);
 
                 buf.putInt(((HeaderRecord)record).version());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index c02f36e..16a81a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -24,9 +24,9 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 
 /**
@@ -54,6 +54,9 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
             case DATA_RECORD:
                 return delegateSerializer.size(record) + 8/*timestamp*/;
 
+            case SNAPSHOT:
+                return 8 + 1;
+
             default:
                 return delegateSerializer.size(record);
         }
@@ -76,6 +79,12 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
 
                 return new DataRecord(entries, timeStamp);
 
+            case SNAPSHOT:
+                long snpId = in.readLong();
+                byte full = in.readByte();
+
+                return new SnapshotRecord(snpId, full == 1);
+
             default:
                 return delegateSerializer.readRecord(type, in);
         }
@@ -98,6 +107,14 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
 
                 break;
 
+            case SNAPSHOT:
+                SnapshotRecord snpRec = (SnapshotRecord)record;
+
+                buf.putLong(snpRec.getSnapshotId());
+                buf.put(snpRec.isFull() ? (byte)1 : 0);
+
+                break;
+
             default:
                 delegateSerializer.writeRecord(record, buf);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
new file mode 100644
index 0000000..c5760ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+
+/**
+ * Record serializer.
+ */
+public interface RecordSerializer {
+    /**
+     * @return serializer version
+     */
+    public int version();
+
+    /**
+     * Calculates record size in byte including expected wal pointer, CRC and type field
+     *
+     * @param record Record.
+     * @return Size in bytes.
+     */
+    public int size(WALRecord record) throws IgniteCheckedException;
+
+    /**
+     * @param record Entry to write.
+     * @param buf Buffer.
+     */
+    public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+
+    /**
+     * Loads record from input
+     *
+     * @param in Data input to read data from.
+     * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
+     * @return Read entry.
+     */
+    public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
+
+    /**
+     * Flag to write (or not) wal pointer to record
+     */
+    public boolean writePointer();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
new file mode 100644
index 0000000..f46c315
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * Factory for creating {@link RecordSerializer}.
+ */
+public interface RecordSerializerFactory {
+    /**
+     * Factory method for creation {@link RecordSerializer}.
+     *
+     * @param ver Serializer version.
+     * @return record serializer.
+     */
+    public RecordSerializer createSerializer(int ver) throws IgniteCheckedException;
+
+    /**
+     * TODO: This flag was added under IGNITE-6029, but still unused. Should be either handled or removed.
+     *
+     * @param writePointer Write pointer flag.
+     */
+    public RecordSerializerFactory writePointer(boolean writePointer);
+
+    /**
+     * Specifies deserialization filter. Created serializer will read bulk {@link FilteredRecord} instead of actual
+     * record if record type/pointer doesn't satisfy filter.
+     *
+     * @param readTypeFilter Read type filter.
+     */
+    public RecordSerializerFactory recordDeserializeFilter(IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter);
+
+    /**
+     * If marshalledMode is on, created serializer will read {@link MarshalledRecord} with raw binary data instead of
+     * actual record.
+     * Useful for copying binary data from WAL.
+     *
+     * @param marshalledMode Marshalled mode.
+     */
+    public RecordSerializerFactory marshalledMode(boolean marshalledMode);
+
+    /**
+     * If skipPositionCheck is true, created serializer won't check that actual position of record in file is equal to
+     * position in saved record's WALPointer.
+     * Must be true if we are reading from compacted WAL segment.
+     *
+     * @param skipPositionCheck Skip position check.
+     */
+    public RecordSerializerFactory skipPositionCheck(boolean skipPositionCheck);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
new file mode 100644
index 0000000..468392a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
@@ -0,0 +1,133 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ *
+ */
+public class RecordSerializerFactoryImpl implements RecordSerializerFactory {
+    /** Context. */
+    private GridCacheSharedContext cctx;
+
+    /** Write pointer. */
+    private boolean writePointer;
+
+    /** Read record filter. */
+    private IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter;
+
+    /**
+     * Marshalled mode flag.
+     * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
+     */
+    private boolean marshalledMode;
+
+    /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
+    private boolean skipPositionCheck;
+
+    /**
+     * @param cctx Cctx.
+     */
+    public RecordSerializerFactoryImpl(GridCacheSharedContext cctx) {
+        this.cctx = cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordSerializer createSerializer(int ver) throws IgniteCheckedException {
+        if (ver <= 0)
+            throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file).");
+
+        switch (ver) {
+            case 1:
+                return new RecordV1Serializer(new RecordDataV1Serializer(cctx),
+                    writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter);
+
+            case 2:
+                RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
+
+                return new RecordV2Serializer(dataV2Serializer,
+                    writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter);
+
+            default:
+                throw new IgniteCheckedException("Failed to create a serializer with the given version " +
+                    "(forward compatibility is not supported): " + ver);
+        }
+    }
+
+    /**
+     * @return Write pointer.
+     */
+    public boolean writePointer() {
+        return writePointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordSerializerFactoryImpl writePointer(boolean writePointer) {
+        this.writePointer = writePointer;
+
+        return this;
+    }
+
+    /**
+     * @return Read type filter.
+     */
+    public IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter() {
+        return recordDeserializeFilter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordSerializerFactoryImpl recordDeserializeFilter(
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter) {
+        this.recordDeserializeFilter = readTypeFilter;
+
+        return this;
+    }
+
+    /**
+     * @return Marshalled mode. Records are not deserialized in this mode,  with binary representation are read instead.
+     */
+    public boolean marshalledMode() {
+        return marshalledMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordSerializerFactoryImpl marshalledMode(boolean marshalledMode) {
+        this.marshalledMode = marshalledMode;
+
+        return this;
+    }
+
+    /**
+     * @return Skip position check flag. Should be set for reading compacted wal file with skipped physical records.
+     */
+    public boolean skipPositionCheck() {
+        return skipPositionCheck;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordSerializerFactoryImpl skipPositionCheck(boolean skipPositionCheck) {
+        this.skipPositionCheck = skipPositionCheck;
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index c4e1bf2..d460705 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -21,22 +21,26 @@ import java.io.DataInput;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
@@ -72,6 +76,32 @@ public class RecordV1Serializer implements RecordSerializer {
     /** Write pointer. */
     private final boolean writePointer;
 
+    /**
+     * Record type filter.
+     * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter.
+     */
+    private final IgniteBiPredicate<RecordType, WALPointer> recordFilter;
+
+    /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
+    private final boolean skipPositionCheck;
+
+    /**
+     * Marshalled mode.
+     * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
+     */
+    private final boolean marshalledMode;
+
+    /** Thread-local heap byte buffer. */
+    private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() {
+        @Override protected ByteBuffer initialValue() {
+            ByteBuffer buf = ByteBuffer.allocate(4096);
+
+            buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
+
+            return buf;
+        }
+    };
+
     /** Record read/write functional interface. */
     private final RecordIO recordIO = new RecordIO() {
 
@@ -89,11 +119,36 @@ public class RecordV1Serializer implements RecordSerializer {
 
             FileWALPointer ptr = readPosition(in);
 
-            if (!F.eq(ptr, expPtr))
+            if (!skipPositionCheck && !F.eq(ptr, expPtr))
                 throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
                         ", readPtr=" + ptr + ']', null);
 
-            return dataSerializer.readRecord(recType, in);
+            final WALRecord rec = dataSerializer.readRecord(recType, in);
+
+            if (recordFilter != null && !recordFilter.apply(rec.type(), ptr))
+                return new FilteredRecord();
+            else if (marshalledMode) {
+                ByteBuffer buf = heapTlb.get();
+
+                int recordSize = size(rec);
+
+                if (buf.capacity() < recordSize)
+                    heapTlb.set(buf = ByteBuffer.allocate(recordSize * 3 / 2).order(ByteOrder.nativeOrder()));
+                else
+                    buf.clear();
+
+                rec.position(ptr);
+
+                writeRecord(rec, buf);
+
+                buf.flip();
+
+                assert buf.remaining() == recordSize;
+
+                return new MarshalledRecord(rec.type(), rec.position(), buf);
+            }
+            else
+                return rec;
         }
 
         /** {@inheritDoc} */
@@ -111,13 +166,19 @@ public class RecordV1Serializer implements RecordSerializer {
 
     /**
      * Create an instance of V1 serializer.
-     *
      * @param dataSerializer V1 data serializer.
      * @param writePointer Write pointer.
+     * @param marshalledMode Marshalled mode.
+     * @param skipPositionCheck Skip position check mode.
+     * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record
      */
-    public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer) {
+    public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer,
+        boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType, WALPointer> recordFilter) {
         this.dataSerializer = dataSerializer;
         this.writePointer = writePointer;
+        this.recordFilter = recordFilter;
+        this.skipPositionCheck = skipPositionCheck;
+        this.marshalledMode = marshalledMode;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 5eb45ac..05f2a24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -18,21 +18,26 @@
 package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
 
 import java.io.DataInput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
 
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.*;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE;
 
@@ -57,6 +62,32 @@ public class RecordV2Serializer implements RecordSerializer {
     /** Write pointer. */
     private final boolean writePointer;
 
+    /**
+     * Marshalled mode.
+     * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
+     */
+    private final boolean marshalledMode;
+
+    /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
+    private final boolean skipPositionCheck;
+
+    /** Thread-local heap byte buffer. */
+    private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() {
+        @Override protected ByteBuffer initialValue() {
+            ByteBuffer buf = ByteBuffer.allocate(4096);
+
+            buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
+
+            return buf;
+        }
+    };
+
+    /**
+     * Record type filter.
+     * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter.
+     */
+    private final IgniteBiPredicate<RecordType, WALPointer> recordFilter;
+
     /** Record read/write functional interface. */
     private final RecordIO recordIO = new RecordIO() {
 
@@ -75,9 +106,47 @@ public class RecordV2Serializer implements RecordSerializer {
             if (recType == WALRecord.RecordType.SWITCH_SEGMENT_RECORD)
                 throw new SegmentEofException("Reached end of segment", null);
 
-            FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr);
+            FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck);
+
+            if (recordFilter != null && !recordFilter.apply(recType, ptr)) {
+                int toSkip = ptr.length() - REC_TYPE_SIZE - FILE_WAL_POINTER_SIZE - CRC_SIZE;
+
+                assert toSkip >= 0 : "Too small saved record length: " + ptr;
+
+                if (in.skipBytes(toSkip) < toSkip)
+                    throw new EOFException("Reached end of file while reading record: " + ptr);
+
+                return new FilteredRecord();
+            }
+            else if (marshalledMode) {
+                ByteBuffer buf = heapTlb.get();
+
+                if (buf.capacity() < ptr.length())
+                    heapTlb.set(buf = ByteBuffer.allocate(ptr.length() * 3 / 2).order(ByteOrder.nativeOrder()));
+                else
+                    buf.clear();
+
+                buf.put((byte)(recType.ordinal() + 1));
+
+                buf.putLong(ptr.index());
+                buf.putInt(ptr.fileOffset());
+                buf.putInt(ptr.length());
+
+                in.readFully(buf.array(), buf.position(), ptr.length() - buf.position());
+                buf.position(ptr.length());
+
+                // Unwind reading CRC.
+                in.buffer().position(in.buffer().position() - CRC_SIZE);
+
+                buf.flip();
+
+                assert buf.remaining() == ptr.length();
+
+                return new MarshalledRecord(recType, ptr, buf);
+            }
+            else
+                return dataSerializer.readRecord(recType, in);
 
-            return dataSerializer.readRecord(recType, in);
         }
 
         /** {@inheritDoc} */
@@ -98,12 +167,18 @@ public class RecordV2Serializer implements RecordSerializer {
 
     /**
      * Create an instance of Record V2 serializer.
-     *
      * @param dataSerializer V2 data serializer.
+     * @param marshalledMode Marshalled mode.
+     * @param skipPositionCheck Skip position check mode.
+     * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record
      */
-    public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer) {
+    public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer,
+        boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType, WALPointer> recordFilter) {
         this.dataSerializer = dataSerializer;
         this.writePointer = writePointer;
+        this.marshalledMode = marshalledMode;
+        this.skipPositionCheck = skipPositionCheck;
+        this.recordFilter = recordFilter;
     }
 
     /** {@inheritDoc} */
@@ -133,12 +208,14 @@ public class RecordV2Serializer implements RecordSerializer {
 
     /**
      * @param in Data input to read pointer from.
+     * @param skipPositionCheck Flag for skipping position check.
      * @return Read file WAL pointer.
      * @throws IOException If failed to write.
      */
     public static FileWALPointer readPositionAndCheckPoint(
         DataInput in,
-        WALPointer expPtr
+        WALPointer expPtr,
+        boolean skipPositionCheck
     ) throws IgniteCheckedException, IOException {
         long idx = in.readLong();
         int fileOffset = in.readInt();
@@ -146,7 +223,7 @@ public class RecordV2Serializer implements RecordSerializer {
 
         FileWALPointer p = (FileWALPointer)expPtr;
 
-        if (!F.eq(idx, p.index()) || !F.eq(fileOffset, p.fileOffset()))
+        if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOffset, p.fileOffset())))
             throw new WalSegmentTailReachedException(
                 "WAL segment tail is reached. [ " +
                         "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index a8df0c6..0a278d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -1680,6 +1680,7 @@ public class PlatformConfigurationUtils {
                 .setMetricsRateTimeInterval(in.readLong())
                 .setCheckpointWriteOrder(CheckpointWriteOrder.fromOrdinal(in.readInt()))
                 .setWriteThrottlingEnabled(in.readBoolean())
+                .setWalCompactionEnabled(in.readBoolean())
                 .setSystemRegionInitialSize(in.readLong())
                 .setSystemRegionMaxSize(in.readLong())
                 .setPageSize(in.readInt())
@@ -1774,6 +1775,7 @@ public class PlatformConfigurationUtils {
             w.writeLong(cfg.getMetricsRateTimeInterval());
             w.writeInt(cfg.getCheckpointWriteOrder().ordinal());
             w.writeBoolean(cfg.isWriteThrottlingEnabled());
+            w.writeBoolean(cfg.isWalCompactionEnabled());
             w.writeLong(cfg.getSystemRegionInitialSize());
             w.writeLong(cfg.getSystemRegionMaxSize());
             w.writeInt(cfg.getPageSize());

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index af8f679..107b467 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -41,7 +42,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
-import java.nio.file.OpenOption;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.READ;
@@ -120,16 +120,17 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
      */
     private void flushingErrorTest() throws Exception {
         final IgniteEx grid = startGrid(0);
-        grid.active(true);
 
-        IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+        try {
+            grid.active(true);
 
-        final int iterations = 100;
+            IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+
+            final int iterations = 100;
 
-        try {
             for (int i = 0; i < iterations; i++) {
                 Transaction tx = grid.transactions().txStart(
-                        TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+                    TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
 
                 cache.put(i, "testValue" + i);
 
@@ -144,8 +145,7 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
 
         // We should await successful stop of node.
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override
-            public boolean apply() {
+            @Override public boolean apply() {
                 return grid.context().gateway().getState() == GridKernalState.STOPPED;
             }
         }, getTestTimeout());

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index 35d85d1..c6d58e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -27,9 +27,9 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index b357877..00627b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -113,7 +113,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
     /** */
     private int walSegmentSize;
 
-    /** Logger only. */
+    /** Log only. */
     private boolean logOnly;
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java
new file mode 100644
index 0000000..d927676
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class IgniteWalRecoveryWithCompactionTest extends IgniteWalRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.getDataStorageConfiguration().setWalCompactionEnabled(true);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
index 7500fdc..60fb1d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -76,6 +76,8 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCheckDifferentSerializerVersions() throws Exception {
+        System.setProperty(IGNITE_WAL_SERIALIZER_VERSION, "1");
+
         IgniteEx ig0 = (IgniteEx)startGrid();
 
         IgniteWriteAheadLogManager wal0 = ig0.context().cache().context().wal();
@@ -302,6 +304,11 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest {
         deleteWorkFiles();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty(IGNITE_WAL_SERIALIZER_VERSION);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
new file mode 100644
index 0000000..6b9f06a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -0,0 +1,312 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Arrays;
+import java.util.Comparator;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
+
+/**
+ *
+ */
+public class WalCompactionTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Wal segment size. */
+    private static final int WAL_SEGMENT_SIZE = 4 * 1024 * 1024;
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cache";
+
+    /** Entries count. */
+    public static final int ENTRIES = 1000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)
+                .setMaxSize(200 * 1024 * 1024))
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalSegmentSize(WAL_SEGMENT_SIZE)
+            .setWalHistorySize(500)
+            .setWalCompactionEnabled(true));
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName("cache");
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+        ccfg.setBackups(0);
+
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setConsistentId(name);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testApplyingUpdatesFromCompactedWal() throws Exception {
+        IgniteEx ig = (IgniteEx)startGrids(3);
+        ig.active(true);
+
+        IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+
+        for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total.
+            final byte[] val = new byte[20000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        // Spam WAL to move all data records to compressible WAL zone.
+        for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++)
+            ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE]));
+
+        // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+        Thread.sleep(15_000); // Allow compressor to archive WAL segments.
+
+        String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+        File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+        File walDir = new File(dbDir, "wal");
+        File archiveDir = new File(walDir, "archive");
+        File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+        File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(0) + ".zip");
+
+        assertTrue(walSegment.exists());
+        assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be compressed at least in half.
+
+        stopAllGrids();
+
+        File nodeLfsDir = new File(dbDir, nodeFolderName);
+        File cpMarkersDir = new File(nodeLfsDir, "cp");
+
+        File[] cpMarkers = cpMarkersDir.listFiles();
+
+        assertNotNull(cpMarkers);
+        assertTrue(cpMarkers.length > 0);
+
+        File cacheDir = new File(nodeLfsDir, "cache-" + CACHE_NAME);
+        File[] lfsFiles = cacheDir.listFiles();
+
+        assertNotNull(lfsFiles);
+        assertTrue(lfsFiles.length > 0);
+
+        // Enforce reading WAL from the very beginning at the next start.
+        for (File f : cpMarkers)
+            f.delete();
+
+        for (File f : lfsFiles)
+            f.delete();
+
+        ig = (IgniteEx)startGrids(3);
+        ig.active(true);
+
+        cache = ig.cache(CACHE_NAME);
+
+        boolean fail = false;
+
+        // Check that all data is recovered from compacted WAL.
+        for (int i = 0; i < ENTRIES; i++) {
+            byte[] arr = cache.get(i);
+
+            if (arr == null) {
+                System.out.println(">>> Missing: " + i);
+
+                fail = true;
+            }
+            else if (arr[i] != 1) {
+                System.out.println(">>> Corrupted: " + i);
+
+                fail = true;
+            }
+        }
+
+        assertFalse(fail);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSeekingStartInCompactedSegment() throws Exception {
+        IgniteEx ig = (IgniteEx)startGrids(3);
+        ig.active(true);
+
+        IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+
+        for (int i = 0; i < 100; i++) {
+            final byte[] val = new byte[20000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+        String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+        File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+        File nodeLfsDir = new File(dbDir, nodeFolderName);
+        File cpMarkersDir = new File(nodeLfsDir, "cp");
+
+        final File[] cpMarkersToSave = cpMarkersDir.listFiles();
+
+        assert cpMarkersToSave != null;
+        assertTrue(cpMarkersToSave.length >= 2);
+
+        Arrays.sort(cpMarkersToSave, new Comparator<File>() {
+            @Override public int compare(File o1, File o2) {
+                return o1.getName().compareTo(o2.getName());
+            }
+        });
+
+        for (int i = 100; i < ENTRIES; i++) { // At least 20MB of raw data in total.
+            final byte[] val = new byte[20000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        // Spam WAL to move all data records to compressible WAL zone.
+        for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++)
+            ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE]));
+
+        // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+        Thread.sleep(15_000); // Allow compressor to archive WAL segments.
+
+        File walDir = new File(dbDir, "wal");
+        File archiveDir = new File(walDir, "archive");
+        File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+        File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(0) + ".zip");
+
+        assertTrue(walSegment.exists());
+        assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be compressed at least in half.
+
+        stopAllGrids();
+
+        File[] cpMarkers = cpMarkersDir.listFiles(new FilenameFilter() {
+            @Override public boolean accept(File dir, String name) {
+                return !(name.equals(cpMarkersToSave[0].getName()) || name.equals(cpMarkersToSave[1].getName()));
+            }
+        });
+
+        assertNotNull(cpMarkers);
+        assertTrue(cpMarkers.length > 0);
+
+        File cacheDir = new File(nodeLfsDir, "cache-" + CACHE_NAME);
+        File[] lfsFiles = cacheDir.listFiles();
+
+        assertNotNull(lfsFiles);
+        assertTrue(lfsFiles.length > 0);
+
+        // Enforce reading WAL from the very beginning at the next start.
+        for (File f : cpMarkers)
+            f.delete();
+
+        for (File f : lfsFiles)
+            f.delete();
+
+        ig = (IgniteEx)startGrids(3);
+        ig.active(true);
+
+        cache = ig.cache(CACHE_NAME);
+
+        int missing = 0;
+
+        for (int i = 0; i < 100; i++) {
+            if (!cache.containsKey(i))
+                missing++;
+        }
+
+        System.out.println(">>> Missing " + missing + " entries logged before WAL iteration start");
+        assertTrue(missing > 0);
+
+        boolean fail = false;
+
+        // Check that all data is recovered from compacted WAL.
+        for (int i = 100; i < ENTRIES; i++) {
+            byte[] arr = cache.get(i);
+
+            if (arr == null) {
+                System.out.println(">>> Missing: " + i);
+
+                fail = true;
+            }
+            else if (arr[i] != 1) {
+                System.out.println(">>> Corrupted: " + i);
+
+                fail = true;
+            }
+        }
+
+        assertFalse(fail);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 72450b8..b95208c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -77,6 +77,11 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
+    @Override public void allowCompressionUntil(WALPointer ptr) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserved(WALPointer ptr) {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index adfdb2c..17833ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlySelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
 
@@ -93,6 +94,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgniteWalSerializerVersionTest.class);
 
+        suite.addTestSuite(WalCompactionTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 114f630..ea30ce2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryWithCompactionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest;
 
 /**
@@ -49,6 +50,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite {
         suite.addTestSuite(WalRecoveryTxLogicalRecordsTest.class);
 
         suite.addTestSuite(IgniteWalRecoveryTest.class);
+        suite.addTestSuite(IgniteWalRecoveryWithCompactionTest.class);
         suite.addTestSuite(IgnitePdsNoActualWalHistoryTest.class);
         suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class);
         suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
index 09b3fe4..ae3c0ed 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
@@ -134,6 +134,11 @@ namespace Apache.Ignite.Core.Configuration
         public const bool DefaultWriteThrottlingEnabled = false;
 
         /// <summary>
+        /// Default value for <see cref="WalCompactionEnabled"/>.
+        /// </summary>
+        public const bool DefaultWalCompactionEnabled = false;
+
+        /// <summary>
         /// Default size of a memory chunk reserved for system cache initially.
         /// </summary>
         public const long DefaultSystemRegionInitialSize = 40 * 1024 * 1024;
@@ -174,6 +179,7 @@ namespace Apache.Ignite.Core.Configuration
             WalPath = DefaultWalPath;
             CheckpointWriteOrder = DefaultCheckpointWriteOrder;
             WriteThrottlingEnabled = DefaultWriteThrottlingEnabled;
+            WalCompactionEnabled = DefaultWalCompactionEnabled;
             SystemRegionInitialSize = DefaultSystemRegionInitialSize;
             SystemRegionMaxSize = DefaultSystemRegionMaxSize;
             PageSize = DefaultPageSize;
@@ -207,6 +213,7 @@ namespace Apache.Ignite.Core.Configuration
             MetricsRateTimeInterval = reader.ReadLongAsTimespan();
             CheckpointWriteOrder = (CheckpointWriteOrder)reader.ReadInt();
             WriteThrottlingEnabled = reader.ReadBoolean();
+            WalCompactionEnabled = reader.ReadBoolean();
 
             SystemRegionInitialSize = reader.ReadLong();
             SystemRegionMaxSize = reader.ReadLong();
@@ -256,6 +263,7 @@ namespace Apache.Ignite.Core.Configuration
             writer.WriteTimeSpanAsLong(MetricsRateTimeInterval);
             writer.WriteInt((int)CheckpointWriteOrder);
             writer.WriteBoolean(WriteThrottlingEnabled);
+            writer.WriteBoolean(WalCompactionEnabled);
 
             writer.WriteLong(SystemRegionInitialSize);
             writer.WriteLong(SystemRegionMaxSize);
@@ -420,6 +428,14 @@ namespace Apache.Ignite.Core.Configuration
         public bool WriteThrottlingEnabled { get; set; }
 
         /// <summary>
+        /// Gets or sets flag indicating whether WAL compaction is enabled.
+        /// If true, system filters and compresses WAL archive in background.
+        /// Compressed WAL archive gets automatically decompressed on demand.
+        /// </summary>
+        [DefaultValue(DefaultWalCompactionEnabled)]
+        public bool WalCompactionEnabled { get; set; }
+
+        /// <summary>
         /// Gets or sets the size of a memory chunk reserved for system needs.
         /// </summary>
         [DefaultValue(DefaultSystemRegionInitialSize)]

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 98607b6..c6eb3c1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -1806,6 +1806,11 @@
                                 <xs:documentation>Threads that generate dirty pages too fast during ongoing checkpoint will be throttled.</xs:documentation>
                             </xs:annotation>
                         </xs:attribute>
+                        <xs:attribute name="walCompactionEnabled" type="xs:boolean">
+                            <xs:annotation>
+                                <xs:documentation>If true, system will filter and compress WAL archive in background. Compressed WAL archive gets automatically decompressed on demand.</xs:documentation>
+                            </xs:annotation>
+                        </xs:attribute>
                         <xs:attribute name="pageSize" type="xs:int">
                             <xs:annotation>
                                 <xs:documentation>Size of the memory page.</xs:documentation>