You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/10/03 14:03:16 UTC

[02/37] ignite git commit: IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization.

IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization.

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00770767
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00770767
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00770767

Branch: refs/heads/ignite-2.3
Commit: 007707674e32b4123708e267feec04075a1b4663
Parents: b7bb792
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Thu Sep 28 12:15:19 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Thu Sep 28 12:16:15 2017 +0300

----------------------------------------------------------------------
 .../pagemem/wal/record/SwitchSegmentRecord.java |   28 +
 .../wal/AbstractWalRecordsIterator.java         |   54 +-
 .../wal/FileWriteAheadLogManager.java           |  193 +-
 .../persistence/wal/RecordDataSerializer.java   |   41 +
 .../wal/WalSegmentTailReachedException.java     |   37 +
 .../wal/reader/IgniteWalIteratorFactory.java    |    5 +-
 .../reader/StandaloneWalRecordsIterator.java    |   27 +-
 .../wal/serializer/RecordDataV1Serializer.java  | 1574 ++++++++++++++++
 .../wal/serializer/RecordDataV2Serializer.java  |   64 +
 .../wal/serializer/RecordV1Serializer.java      | 1673 ++----------------
 .../wal/serializer/RecordV2Serializer.java      |  170 ++
 .../persistence/wal/serializer/io/RecordIO.java |   60 +
 12 files changed, 2305 insertions(+), 1621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java
new file mode 100644
index 0000000..948ec7e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+/**
+ * Record is needed to mark end of segment.
+ */
+public class SwitchSegmentRecord extends WALRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.SWITCH_SEGMENT_RECORD;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index d5a2555..5be6e55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.NotNull;
@@ -154,21 +153,31 @@ public abstract class AbstractWalRecordsIterator
      */
     protected void advance() throws IgniteCheckedException {
         while (true) {
-            curRec = advanceRecord(currWalSegment);
-
-            if (curRec != null)
-                return;
-            else {
-                currWalSegment = advanceSegment(currWalSegment);
+            try {
+                curRec = advanceRecord(currWalSegment);
 
-                if (currWalSegment == null)
+                if (curRec != null)
                     return;
+                else {
+                    currWalSegment = advanceSegment(currWalSegment);
+
+                    if (currWalSegment == null)
+                        return;
+                }
+            }
+            catch (WalSegmentTailReachedException e) {
+                log.warning(e.getMessage());
+
+                curRec = null;
+
+                return;
             }
         }
     }
 
     /**
      * Closes and returns WAL segment (if any)
+     *
      * @return closed handle
      * @throws IgniteCheckedException if IO failed
      */
@@ -199,7 +208,8 @@ public abstract class AbstractWalRecordsIterator
      * @return next advanced record
      */
     private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
-        @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd) {
+        @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd
+    ) throws IgniteCheckedException {
         if (hnd == null)
             return null;
 
@@ -217,8 +227,12 @@ public abstract class AbstractWalRecordsIterator
             return new IgniteBiTuple<>((WALPointer)ptr, postProcessRecord(rec));
         }
         catch (IOException | IgniteCheckedException e) {
+            if (e instanceof WalSegmentTailReachedException)
+                throw (WalSegmentTailReachedException)e;
+
             if (!(e instanceof SegmentEofException))
                 handleRecordException(e, ptr);
+
             return null;
         }
     }
@@ -261,24 +275,18 @@ public abstract class AbstractWalRecordsIterator
             FileIO fileIO = ioFactory.create(desc.file);
 
             try {
-                FileInput in = new FileInput(fileIO, buf);
+                int serVer = FileWriteAheadLogManager.readSerializerVersion(fileIO);
 
-                // Header record must be agnostic to the serializer version.
-                WALRecord rec = serializer.readRecord(in,
-                    new FileWALPointer(desc.idx, (int)fileIO.position(), 0));
+                RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, serVer);
 
-                if (rec == null)
-                    return null;
-
-                if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
-                    throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile());
-
-                int ver = ((HeaderRecord)rec).version();
+                FileInput in = new FileInput(fileIO, buf);
 
-                RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver, serializer.writePointer());
+                if (start != null && desc.idx == start.index()) {
+                    // Make sure we skip header with serializer version.
+                    long startOffset = Math.max(start.fileOffset(), fileIO.position());
 
-                if (start != null && desc.idx == start.index())
-                    in.seek(start.fileOffset());
+                    in.seek(startOffset);
+                }
 
                 return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 9b2d948..c4582cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
@@ -62,8 +63,12 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
 import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
@@ -116,6 +121,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
     };
 
+    /** Latest serializer version to use. */
+    public static final int LATEST_SERIALIZER_VERSION = 1;
+
     /** */
     private final boolean alwaysWriteFullPages;
 
@@ -152,9 +160,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** WAL archive directory (including consistent ID as subfolder) */
     private File walArchiveDir;
 
-    /** Serializer of current version, used to read header record and for write records */
+    /** Serializer of latest version. */
     private RecordSerializer serializer;
 
+    /** Serializer latest version to use. */
+    private int serializerVersion = LATEST_SERIALIZER_VERSION;
+
     /** */
     private volatile long oldestArchiveSegmentIdx;
 
@@ -266,7 +277,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 "write ahead log archive directory"
             );
 
-            serializer = new RecordV1Serializer(cctx);
+            serializer = forVersion(cctx, serializerVersion);
 
             GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
 
@@ -818,10 +829,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             FileIO fileIO = ioFactory.create(curFile);
 
             try {
-                // readSerializerVersion will change the channel position.
-                // This is fine because the FileWriteHandle consitructor will move it
-                // to offset + len anyways.
-                int serVer = readSerializerVersion(fileIO, curFile, absIdx);
+                int serVer = serializerVersion;
+
+                // If we have existing segment, try to read version from it.
+                if (lastReadPtr != null) {
+                    try {
+                        serVer = readSerializerVersion(fileIO);
+                    }
+                    catch (SegmentEofException | EOFException ignore) {
+                        serVer = serializerVersion;
+                    }
+                }
 
                 RecordSerializer ser = forVersion(cctx, serVer);
 
@@ -837,13 +855,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     maxWalSegmentSize,
                     ser);
 
-                if (lastReadPtr == null) {
-                    HeaderRecord header = new HeaderRecord(serializer.version());
-
-                    header.size(serializer.size(header));
-
-                    hnd.addRecord(header);
-                }
+                // For new handle write serializer version to it.
+                if (lastReadPtr == null)
+                    hnd.writeSerializerVersion();
 
                 archiver.currentWalIndex(absIdx);
 
@@ -887,11 +901,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 maxWalSegmentSize,
                 serializer);
 
-            HeaderRecord header = new HeaderRecord(serializer.version());
-
-            header.size(serializer.size(header));
-
-            hnd.addRecord(header);
+            hnd.writeSerializerVersion();
 
             return hnd;
         }
@@ -1010,10 +1020,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * @param cctx Shared context.
      * @param ver Serializer version.
      * @return Entry serializer.
      */
-    static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
+    public static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
         return forVersion(cctx, ver, false);
     }
 
@@ -1027,7 +1038,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         switch (ver) {
             case 1:
-                return new RecordV1Serializer(cctx, writePointer);
+                return new RecordV1Serializer(new RecordDataV1Serializer(cctx), writePointer);
+
+            case 2:
+                RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
+
+                return new RecordV2Serializer(dataV2Serializer, writePointer);
 
             default:
                 throw new IgniteCheckedException("Failed to create a serializer with the given version " +
@@ -1434,29 +1450,103 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Reads record serializer version from provided {@code io}.
+     * NOTE: Method mutates position of {@code io}.
+     *
      * @param io I/O interface for file.
-     * @param file File object.
-     * @param idx File index to read.
      * @return Serializer version stored in the file.
-     * @throws IOException If failed to read serializer version.
      * @throws IgniteCheckedException If failed to read serializer version.
      */
-    private int readSerializerVersion(FileIO io, File file, long idx)
-        throws IOException, IgniteCheckedException {
-        try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())){
+    public static int readSerializerVersion(FileIO io)
+            throws IgniteCheckedException, IOException {
+        try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
             FileInput in = new FileInput(io, buf);
 
-            // Header record must be agnostic to the serializer version.
-            WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
+            in.ensure(RecordV1Serializer.HEADER_RECORD_SIZE);
+
+            int recordType = in.readUnsignedByte();
+
+            if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
+                throw new SegmentEofException("Reached logical end of the segment", null);
+
+            WALRecord.RecordType type = WALRecord.RecordType.fromOrdinal(recordType - 1);
+
+            if (type != WALRecord.RecordType.HEADER_RECORD)
+                throw new IOException("Can't read serializer version", null);
+
+            // Read file pointer.
+            FileWALPointer ptr = RecordV1Serializer.readPosition(in);
+
+            assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr;
+
+            long headerMagicNumber = in.readLong();
+
+            if (headerMagicNumber != HeaderRecord.MAGIC)
+                throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
+                        ", actual=" + U.hexLong(headerMagicNumber) + ']');
+
+            // Read serializer version.
+            int version = in.readInt();
+
+            // Read and skip CRC.
+            in.readInt();
+
+            return version;
+        }
+    }
+
+    /**
+     * Writes record serializer version to provided {@code io}.
+     * NOTE: Method mutates position of {@code io}.
+     *
+     * @param io I/O interface for file.
+     * @param idx Segment index.
+     * @param version Serializer version.
+     * @return I/O position after write version.
+     * @throws IOException If failed to write serializer version.
+     */
+    public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+        buffer.order(ByteOrder.nativeOrder());
+
+        // Write record type.
+        buffer.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
+
+        // Write position.
+        RecordV1Serializer.putPosition(buffer, new FileWALPointer(idx, 0, 0));
 
-            if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
-                throw new IOException("Missing file header record: " + file.getAbsoluteFile());
+        // Place magic number.
+        buffer.putLong(HeaderRecord.MAGIC);
 
-            return ((HeaderRecord)rec).version();
+        // Place serializer version.
+        buffer.putInt(version);
+
+        // Place CRC if needed.
+        if (!RecordV1Serializer.SKIP_CRC) {
+            int curPos = buffer.position();
+
+            buffer.position(0);
+
+            // This call will move buffer position to the end of the record again.
+            int crcVal = PureJavaCrc32.calcCrc32(buffer, curPos);
+
+            buffer.putInt(crcVal);
         }
-        catch (SegmentEofException | EOFException ignore) {
-            return serializer.version();
+        else
+            buffer.putInt(0);
+
+        // Write header record through io.
+        buffer.position(0);
+
+        do {
+            io.write(buffer);
         }
+        while (buffer.hasRemaining());
+
+        // Flush
+        io.force();
+
+        return io.position();
     }
 
     /**
@@ -1715,6 +1805,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
+         * Write serializer version to current handle.
+         * NOTE: Method mutates {@code fileIO} position, written and lastFsyncPos fields.
+         *
+         * @throws IgniteCheckedException If fail to write serializer version.
+         */
+        public void writeSerializerVersion() throws IgniteCheckedException {
+            try {
+                assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position();
+
+                long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, serializer.version());
+
+                written = updatedPosition;
+                lastFsyncPos = updatedPosition;
+                head.set(new FakeRecord(new FileWALPointer(idx, (int)updatedPosition, 0), false));
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Unable to write serializer version for segment " + idx, e);
+            }
+        }
+
+        /**
          * Checks if current head is a close fake record and returns {@code true} if so.
          *
          * @return {@code true} if current head is close record.
@@ -2080,15 +2191,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     assert stopped() : "Segment is not closed after close flush: " + head.get();
 
                     try {
-                        int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
+                        int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE + RecordV1Serializer.CRC_SIZE;
+
+                    if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+                        RecordV1Serializer backwardSerializer =
+                            new RecordV1Serializer(new RecordDataV1Serializer(cctx), true);
 
-                        if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
-                            //it is expected there is sufficient space for this record because rollover should run early
-                            final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
-                            buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1));
+                        final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
 
-                            final FileWALPointer pointer = new FileWALPointer(idx, (int)fileIO.position(), -1);
-                            RecordV1Serializer.putPosition(buf, pointer);
+                        SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+                        segmentRecord.position( new FileWALPointer(idx, (int)written, -1));
+                        backwardSerializer.writeRecord(segmentRecord,buf);
 
                             buf.rewind();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
new file mode 100644
index 0000000..242641d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
@@ -0,0 +1,41 @@
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+
+/**
+ * Interface to provide size, read and write operations with WAL records
+ * <b>without any headers and meta information</b>.
+ */
+public interface RecordDataSerializer {
+    /**
+     * Calculates size of record data.
+     *
+     * @param record WAL record.
+     * @return Size of record in bytes.
+     * @throws IgniteCheckedException If it's unable to calculate record data size.
+     */
+    int size(WALRecord record) throws IgniteCheckedException;
+
+    /**
+     * Reads record data of {@code type} from buffer {@code in}.
+     *
+     * @param type Record type.
+     * @param in Buffer to read.
+     * @return WAL record.
+     * @throws IOException In case of I/O problems.
+     * @throws IgniteCheckedException If it's unable to read record.
+     */
+    WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
+
+    /**
+     * Writes record data to buffer {@code buf}.
+     *
+     * @param record WAL record.
+     * @param buf Buffer to write.
+     * @throws IgniteCheckedException If it's unable to write record.
+     */
+    void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java
new file mode 100644
index 0000000..36298dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An exception is thrown when we reached tail of WAL segment cyclic buffer
+ * during reading from WAL.
+ */
+public class WalSegmentTailReachedException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     *
+     */
+    public WalSegmentTailReachedException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 3a34e28..0fb8adf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -133,7 +133,7 @@ public class IgniteWalIteratorFactory {
      * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated
      * according its boundaries.
      */
-    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) {
+    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, int pageSize) {
         this(log, new PersistentStoreConfiguration().getFileIOFactory(), pageSize);
     }
 
@@ -148,8 +148,7 @@ public class IgniteWalIteratorFactory {
      * @return closable WAL records iterator, should be closed when non needed
      * @throws IgniteCheckedException if failed to read folder
      */
-    public WALIterator iteratorArchiveDirectory(
-        @NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
+    public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
         return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory, keepBinary);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index c92d572..f1258a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -100,14 +100,15 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
      * (BinaryObjects will be used instead)
      */
     StandaloneWalRecordsIterator(
-        @NotNull final File walFilesDir,
-        @NotNull final IgniteLogger log,
-        @NotNull final GridCacheSharedContext sharedCtx,
-        @NotNull final FileIOFactory ioFactory,
-        final boolean keepBinary) throws IgniteCheckedException {
+        @NotNull File walFilesDir,
+        @NotNull IgniteLogger log,
+        @NotNull GridCacheSharedContext sharedCtx,
+        @NotNull FileIOFactory ioFactory,
+        boolean keepBinary
+    ) throws IgniteCheckedException {
         super(log,
             sharedCtx,
-            new RecordV1Serializer(sharedCtx, true),
+            FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
             ioFactory,
             BUF_SIZE);
         this.keepBinary = keepBinary;
@@ -127,15 +128,15 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
      * @param walFiles Wal files.
      */
     StandaloneWalRecordsIterator(
-        @NotNull final IgniteLogger log,
-        @NotNull final GridCacheSharedContext sharedCtx,
-        @NotNull final FileIOFactory ioFactory,
-        final boolean workDir,
-        final boolean keepBinary,
-        @NotNull final File... walFiles) throws IgniteCheckedException {
+            @NotNull IgniteLogger log,
+            @NotNull GridCacheSharedContext sharedCtx,
+            @NotNull FileIOFactory ioFactory,
+            boolean workDir,
+            boolean keepBinary,
+            @NotNull File... walFiles) throws IgniteCheckedException {
         super(log,
             sharedCtx,
-            new RecordV1Serializer(sharedCtx, true),
+            FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
             ioFactory,
             BUF_SIZE);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
new file mode 100644
index 0000000..8b5e6ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -0,0 +1,1574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixCountRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixLeftmostChildRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixRemoveId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InnerReplaceRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MergeRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageAddRootRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageCutRootRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootInlineRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastAllocatedIndex;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulFullSnapshotId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListInitNewPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListRemovePageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetNextRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.RecycleRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.RemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ReplaceRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.SplitExistingPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.SplitForwardPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.TrackingPageDeltaRecord;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
+
+/**
+ * Record data V1 serializer.
+ */
+public class RecordDataV1Serializer implements RecordDataSerializer {
+    /** Length of HEADER record data. */
+    static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
+
+    /** Cache shared context */
+    private final GridCacheSharedContext cctx;
+
+    /** Size of page used for PageMemory regions */
+    private final int pageSize;
+
+    /** Cache object processor to reading {@link DataEntry DataEntries} */
+    private final IgniteCacheObjectProcessor co;
+
+    /** Serializer of {@link TxRecord} records. */
+    private TxRecordSerializer txRecordSerializer;
+
+    /**
+     * @param cctx Cctx.
+     */
+    public RecordDataV1Serializer(GridCacheSharedContext cctx) {
+        this.cctx = cctx;
+        this.txRecordSerializer = new TxRecordSerializer(cctx);
+        this.co = cctx.kernalContext().cacheObjects();
+        this.pageSize = cctx.database().pageSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size(WALRecord record) throws IgniteCheckedException {
+        switch (record.type()) {
+            case PAGE_RECORD:
+                assert record instanceof PageSnapshot;
+
+                PageSnapshot pageRec = (PageSnapshot)record;
+
+                return pageRec.pageData().length + 12;
+
+            case CHECKPOINT_RECORD:
+                CheckpointRecord cpRec = (CheckpointRecord)record;
+
+                assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
+                        "Invalid WAL record: " + cpRec;
+
+                int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates());
+
+                FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
+
+                return 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
+
+            case META_PAGE_INIT:
+                return /*cache ID*/4 + /*page ID*/8 + /*ioType*/2  + /*ioVer*/2 +  /*tree root*/8 + /*reuse root*/8;
+
+            case PARTITION_META_PAGE_UPDATE_COUNTERS:
+                return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1
+                        + /*allocatedIdxCandidate*/ 4;
+
+            case MEMORY_RECOVERY:
+                return 8;
+
+            case PARTITION_DESTROY:
+                return /*cacheId*/4 + /*partId*/4;
+
+            case DATA_RECORD:
+                DataRecord dataRec = (DataRecord)record;
+
+                return 4 + dataSize(dataRec);
+
+            case HEADER_RECORD:
+                return HEADER_RECORD_DATA_SIZE;
+
+            case DATA_PAGE_INSERT_RECORD:
+                DataPageInsertRecord diRec = (DataPageInsertRecord)record;
+
+                return 4 + 8 + 2 + diRec.payload().length;
+
+            case DATA_PAGE_UPDATE_RECORD:
+                DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
+
+                return 4 + 8 + 2 + 4 +
+                        uRec.payload().length;
+
+            case DATA_PAGE_INSERT_FRAGMENT_RECORD:
+                final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
+
+                return 4 + 8 + 8 + 4 + difRec.payloadSize();
+
+            case DATA_PAGE_REMOVE_RECORD:
+                return 4 + 8 + 1;
+
+            case DATA_PAGE_SET_FREE_LIST_PAGE:
+                return 4 + 8 + 8;
+
+            case INIT_NEW_PAGE_RECORD:
+                return 4 + 8 + 2 + 2 + 8;
+
+            case BTREE_META_PAGE_INIT_ROOT:
+                return 4 + 8 + 8;
+
+            case BTREE_META_PAGE_INIT_ROOT2:
+                return 4 + 8 + 8 + 2;
+
+            case BTREE_META_PAGE_ADD_ROOT:
+                return 4 + 8 + 8;
+
+            case BTREE_META_PAGE_CUT_ROOT:
+                return 4 + 8;
+
+            case BTREE_INIT_NEW_ROOT:
+                NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
+
+                return 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize();
+
+            case BTREE_PAGE_RECYCLE:
+                return 4 + 8 + 8;
+
+            case BTREE_PAGE_INSERT:
+                InsertRecord<?> inRec = (InsertRecord<?>)record;
+
+                return 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize();
+
+            case BTREE_FIX_LEFTMOST_CHILD:
+                return 4 + 8 + 8;
+
+            case BTREE_FIX_COUNT:
+                return 4 + 8 + 2;
+
+            case BTREE_PAGE_REPLACE:
+                ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
+
+                return 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize();
+
+            case BTREE_PAGE_REMOVE:
+                return 4 + 8 + 2 + 2;
+
+            case BTREE_PAGE_INNER_REPLACE:
+                return 4 + 8 + 2 + 8 + 2 + 8;
+
+            case BTREE_FORWARD_PAGE_SPLIT:
+                return 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2;
+
+            case BTREE_EXISTING_PAGE_SPLIT:
+                return 4 + 8 + 2 + 8;
+
+            case BTREE_PAGE_MERGE:
+                return 4 + 8 + 8 + 2 + 8 + 1;
+
+            case BTREE_FIX_REMOVE_ID:
+                return 4 + 8 + 8;
+
+            case PAGES_LIST_SET_NEXT:
+                return 4 + 8 + 8;
+
+            case PAGES_LIST_SET_PREVIOUS:
+                return 4 + 8 + 8;
+
+            case PAGES_LIST_INIT_NEW_PAGE:
+                return 4 + 8 + 4 + 4 + 8 + 8 + 8;
+
+            case PAGES_LIST_ADD_PAGE:
+                return 4 + 8 + 8;
+
+            case PAGES_LIST_REMOVE_PAGE:
+                return 4 + 8 + 8;
+
+            case TRACKING_PAGE_DELTA:
+                return 4 + 8 + 8 + 8 + 8;
+
+            case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
+                return 4 + 8 + 8 + 8;
+
+            case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+                return 4 + 8 + 8;
+
+            case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
+                return 4 + 8 + 8;
+
+            case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
+                return 4 + 8 + 4;
+
+            case PART_META_UPDATE_STATE:
+                return /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8;
+
+            case PAGE_LIST_META_RESET_COUNT_RECORD:
+                return /*cacheId*/ 4 + /*pageId*/ 8;
+
+            case SWITCH_SEGMENT_RECORD:
+                // CRC is not loaded for switch segment.
+                return -CRC_SIZE;
+
+            case TX_RECORD:
+                return txRecordSerializer.sizeOfTxRecord((TxRecord)record);
+
+            default:
+                throw new UnsupportedOperationException("Type: " + record.type());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+        WALRecord res;
+
+        switch (type) {
+            case PAGE_RECORD:
+                byte[] arr = new byte[pageSize];
+
+                int cacheId = in.readInt();
+                long pageId = in.readLong();
+
+                in.readFully(arr);
+
+                res = new PageSnapshot(new FullPageId(pageId, cacheId), arr);
+
+                break;
+
+            case CHECKPOINT_RECORD:
+                long msb = in.readLong();
+                long lsb = in.readLong();
+                boolean hasPtr = in.readByte() != 0;
+                int idx = hasPtr ? in.readInt() : 0;
+                int offset = hasPtr ? in.readInt() : 0;
+                int len = hasPtr ? in.readInt() : 0;
+
+                Map<Integer, CacheState> states = readPartitionStates(in);
+
+                boolean end = in.readByte() != 0;
+
+                FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null;
+
+                CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end);
+
+                cpRec.cacheGroupStates(states);
+
+                res = cpRec;
+
+                break;
+
+            case META_PAGE_INIT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int ioType = in.readUnsignedShort();
+                int ioVer = in.readUnsignedShort();
+                long treeRoot = in.readLong();
+                long reuseListRoot = in.readLong();
+
+                res = new MetaPageInitRecord(cacheId, pageId, ioType, ioVer, treeRoot, reuseListRoot);
+
+                break;
+
+            case PARTITION_META_PAGE_UPDATE_COUNTERS:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long updCntr = in.readLong();
+                long rmvId = in.readLong();
+                int partSize = in.readInt();
+                long countersPageId = in.readLong();
+                byte state = in.readByte();
+                int allocatedIdxCandidate = in.readInt();
+
+                res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, countersPageId, state, allocatedIdxCandidate);
+
+                break;
+
+            case MEMORY_RECOVERY:
+                long ts = in.readLong();
+
+                res = new MemoryRecoveryRecord(ts);
+
+                break;
+
+            case PARTITION_DESTROY:
+                cacheId = in.readInt();
+                int partId = in.readInt();
+
+                res = new PartitionDestroyRecord(cacheId, partId);
+
+                break;
+
+            case DATA_RECORD:
+                int entryCnt = in.readInt();
+
+                List<DataEntry> entries = new ArrayList<>(entryCnt);
+
+                for (int i = 0; i < entryCnt; i++)
+                    entries.add(readDataEntry(in));
+
+                res = new DataRecord(entries);
+
+                break;
+
+            case HEADER_RECORD:
+                long magic = in.readLong();
+
+                if (magic != HeaderRecord.MAGIC)
+                    throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
+                            ", actual=" + U.hexLong(magic) + ']');
+
+                int ver = in.readInt();
+
+                res = new HeaderRecord(ver);
+
+                break;
+
+            case DATA_PAGE_INSERT_RECORD: {
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int size = in.readUnsignedShort();
+
+                in.ensure(size);
+
+                byte[] payload = new byte[size];
+
+                in.readFully(payload);
+
+                res = new DataPageInsertRecord(cacheId, pageId, payload);
+
+                break;
+            }
+
+            case DATA_PAGE_UPDATE_RECORD: {
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int itemId = in.readInt();
+
+                int size = in.readUnsignedShort();
+
+                in.ensure(size);
+
+                byte[] payload = new byte[size];
+
+                in.readFully(payload);
+
+                res = new DataPageUpdateRecord(cacheId, pageId, itemId, payload);
+
+                break;
+            }
+
+            case DATA_PAGE_INSERT_FRAGMENT_RECORD: {
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                final long lastLink = in.readLong();
+                final int payloadSize = in.readInt();
+
+                final byte[] payload = new byte[payloadSize];
+
+                in.readFully(payload);
+
+                res = new DataPageInsertFragmentRecord(cacheId, pageId, payload, lastLink);
+
+                break;
+            }
+
+            case DATA_PAGE_REMOVE_RECORD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int itemId = in.readUnsignedByte();
+
+                res = new DataPageRemoveRecord(cacheId, pageId, itemId);
+
+                break;
+
+            case DATA_PAGE_SET_FREE_LIST_PAGE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long freeListPage = in.readLong();
+
+                res = new DataPageSetFreeListPageRecord(cacheId, pageId, freeListPage);
+
+                break;
+
+            case INIT_NEW_PAGE_RECORD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                ioType = in.readUnsignedShort();
+                ioVer = in.readUnsignedShort();
+                long virtualPageId = in.readLong();
+
+                res = new InitNewPageRecord(cacheId, pageId, ioType, ioVer, virtualPageId);
+
+                break;
+
+            case BTREE_META_PAGE_INIT_ROOT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long rootId = in.readLong();
+
+                res = new MetaPageInitRootRecord(cacheId, pageId, rootId);
+
+                break;
+
+            case BTREE_META_PAGE_INIT_ROOT2:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long rootId2 = in.readLong();
+                int inlineSize = in.readShort();
+
+                res = new MetaPageInitRootInlineRecord(cacheId, pageId, rootId2, inlineSize);
+
+                break;
+
+            case BTREE_META_PAGE_ADD_ROOT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                rootId = in.readLong();
+
+                res = new MetaPageAddRootRecord(cacheId, pageId, rootId);
+
+                break;
+
+            case BTREE_META_PAGE_CUT_ROOT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                res = new MetaPageCutRootRecord(cacheId, pageId);
+
+                break;
+
+            case BTREE_INIT_NEW_ROOT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                rootId = in.readLong();
+                ioType = in.readUnsignedShort();
+                ioVer = in.readUnsignedShort();
+                long leftId = in.readLong();
+                long rightId = in.readLong();
+
+                BPlusIO<?> io = BPlusIO.getBPlusIO(ioType, ioVer);
+
+                byte[] rowBytes = new byte[io.getItemSize()];
+
+                in.readFully(rowBytes);
+
+                res = new NewRootInitRecord<>(cacheId, pageId, rootId, (BPlusInnerIO<?>)io, leftId, rowBytes, rightId);
+
+                break;
+
+            case BTREE_PAGE_RECYCLE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long newPageId = in.readLong();
+
+                res = new RecycleRecord(cacheId, pageId, newPageId);
+
+                break;
+
+            case BTREE_PAGE_INSERT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                ioType = in.readUnsignedShort();
+                ioVer = in.readUnsignedShort();
+                int itemIdx = in.readUnsignedShort();
+                rightId = in.readLong();
+
+                io = BPlusIO.getBPlusIO(ioType, ioVer);
+
+                rowBytes = new byte[io.getItemSize()];
+
+                in.readFully(rowBytes);
+
+                res = new InsertRecord<>(cacheId, pageId, io, itemIdx, rowBytes, rightId);
+
+                break;
+
+            case BTREE_FIX_LEFTMOST_CHILD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                rightId = in.readLong();
+
+                res = new FixLeftmostChildRecord(cacheId, pageId, rightId);
+
+                break;
+
+            case BTREE_FIX_COUNT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int cnt = in.readUnsignedShort();
+
+                res = new FixCountRecord(cacheId, pageId, cnt);
+
+                break;
+
+            case BTREE_PAGE_REPLACE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                ioType = in.readUnsignedShort();
+                ioVer = in.readUnsignedShort();
+                itemIdx = in.readUnsignedShort();
+
+                io = BPlusIO.getBPlusIO(ioType, ioVer);
+
+                rowBytes = new byte[io.getItemSize()];
+
+                in.readFully(rowBytes);
+
+                res = new ReplaceRecord<>(cacheId, pageId, io, rowBytes, itemIdx);
+
+                break;
+
+            case BTREE_PAGE_REMOVE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                itemIdx = in.readUnsignedShort();
+                cnt = in.readUnsignedShort();
+
+                res = new RemoveRecord(cacheId, pageId, itemIdx, cnt);
+
+                break;
+
+            case BTREE_PAGE_INNER_REPLACE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int dstIdx = in.readUnsignedShort();
+                long srcPageId = in.readLong();
+                int srcIdx = in.readUnsignedShort();
+                rmvId = in.readLong();
+
+                res = new InnerReplaceRecord<>(cacheId, pageId, dstIdx, srcPageId, srcIdx, rmvId);
+
+                break;
+
+            case BTREE_FORWARD_PAGE_SPLIT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long fwdId = in.readLong();
+                ioType = in.readUnsignedShort();
+                ioVer = in.readUnsignedShort();
+                srcPageId = in.readLong();
+                int mid = in.readUnsignedShort();
+                cnt = in.readUnsignedShort();
+
+                res = new SplitForwardPageRecord(cacheId, pageId, fwdId, ioType, ioVer, srcPageId, mid, cnt);
+
+                break;
+
+            case BTREE_EXISTING_PAGE_SPLIT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                mid = in.readUnsignedShort();
+                fwdId = in.readLong();
+
+                res = new SplitExistingPageRecord(cacheId, pageId, mid, fwdId);
+
+                break;
+
+            case BTREE_PAGE_MERGE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long prntId = in.readLong();
+                int prntIdx = in.readUnsignedShort();
+                rightId = in.readLong();
+                boolean emptyBranch = in.readBoolean();
+
+                res = new MergeRecord<>(cacheId, pageId, prntId, prntIdx, rightId, emptyBranch);
+
+                break;
+
+            case BTREE_FIX_REMOVE_ID:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                rmvId = in.readLong();
+
+                res = new FixRemoveId(cacheId, pageId, rmvId);
+
+                break;
+
+            case PAGES_LIST_SET_NEXT:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+                long nextPageId = in.readLong();
+
+                res = new PagesListSetNextRecord(cacheId, pageId, nextPageId);
+
+                break;
+
+            case PAGES_LIST_SET_PREVIOUS:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+                long prevPageId = in.readLong();
+
+                res = new PagesListSetPreviousRecord(cacheId, pageId, prevPageId);
+
+                break;
+
+            case PAGES_LIST_INIT_NEW_PAGE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+                ioType = in.readInt();
+                ioVer = in.readInt();
+                newPageId = in.readLong();
+                prevPageId = in.readLong();
+                long addDataPageId = in.readLong();
+
+                res = new PagesListInitNewPageRecord(cacheId, pageId, ioType, ioVer, newPageId, prevPageId, addDataPageId);
+
+                break;
+
+            case PAGES_LIST_ADD_PAGE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+                long dataPageId = in.readLong();
+
+                res = new PagesListAddPageRecord(cacheId, pageId, dataPageId);
+
+                break;
+
+            case PAGES_LIST_REMOVE_PAGE:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+                long rmvdPageId = in.readLong();
+
+                res = new PagesListRemovePageRecord(cacheId, pageId, rmvdPageId);
+
+                break;
+
+            case TRACKING_PAGE_DELTA:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long pageIdToMark = in.readLong();
+                long nextSnapshotId0 = in.readLong();
+                long lastSuccessfulSnapshotId0 = in.readLong();
+
+                res = new TrackingPageDeltaRecord(cacheId, pageId, pageIdToMark, nextSnapshotId0, lastSuccessfulSnapshotId0);
+
+                break;
+
+            case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long nextSnapshotId = in.readLong();
+
+                res = new MetaPageUpdateNextSnapshotId(cacheId, pageId, nextSnapshotId);
+
+                break;
+
+            case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long lastSuccessfulFullSnapshotId = in.readLong();
+
+                res = new MetaPageUpdateLastSuccessfulFullSnapshotId(cacheId, pageId, lastSuccessfulFullSnapshotId);
+
+                break;
+
+            case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                long lastSuccessfulSnapshotId = in.readLong();
+                long lastSuccessfulSnapshotTag = in.readLong();
+
+                res = new MetaPageUpdateLastSuccessfulSnapshotId(cacheId, pageId, lastSuccessfulSnapshotId, lastSuccessfulSnapshotTag);
+
+                break;
+
+            case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                int lastAllocatedIdx = in.readInt();
+
+                res = new MetaPageUpdateLastAllocatedIndex(cacheId, pageId, lastAllocatedIdx);
+
+                break;
+
+            case PART_META_UPDATE_STATE:
+                cacheId = in.readInt();
+                partId = in.readInt();
+
+                state = in.readByte();
+
+                long updateCounter = in.readLong();
+
+                res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter);
+
+                break;
+
+            case PAGE_LIST_META_RESET_COUNT_RECORD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                res = new PageListMetaResetCountRecord(cacheId, pageId);
+                break;
+
+            case SWITCH_SEGMENT_RECORD:
+                throw new EOFException("END OF SEGMENT");
+
+            case TX_RECORD:
+                res = txRecordSerializer.readTxRecord(in);
+
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Type: " + type);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+        switch (record.type()) {
+            case PAGE_RECORD:
+                PageSnapshot snap = (PageSnapshot)record;
+
+                buf.putInt(snap.fullPageId().groupId());
+                buf.putLong(snap.fullPageId().pageId());
+                buf.put(snap.pageData());
+
+                break;
+
+            case MEMORY_RECOVERY:
+                MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
+
+                buf.putLong(memoryRecoveryRecord.time());
+
+                break;
+
+            case PARTITION_DESTROY:
+                PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record;
+
+                buf.putInt(partDestroy.groupId());
+                buf.putInt(partDestroy.partitionId());
+
+                break;
+
+            case META_PAGE_INIT:
+                MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record;
+
+                buf.putInt(updRootsRec.groupId());
+                buf.putLong(updRootsRec.pageId());
+
+                buf.putShort((short)updRootsRec.ioType());
+                buf.putShort((short)updRootsRec.ioVersion());
+                buf.putLong(updRootsRec.treeRoot());
+                buf.putLong(updRootsRec.reuseListRoot());
+
+                break;
+
+            case PARTITION_META_PAGE_UPDATE_COUNTERS:
+                MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record;
+
+                buf.putInt(partDataRec.groupId());
+                buf.putLong(partDataRec.pageId());
+
+                buf.putLong(partDataRec.updateCounter());
+                buf.putLong(partDataRec.globalRemoveId());
+                buf.putInt(partDataRec.partitionSize());
+                buf.putLong(partDataRec.countersPageId());
+                buf.put(partDataRec.state());
+                buf.putInt(partDataRec.allocatedIndexCandidate());
+
+                break;
+
+            case CHECKPOINT_RECORD:
+                CheckpointRecord cpRec = (CheckpointRecord)record;
+
+                assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
+                        "Invalid WAL record: " + cpRec;
+
+                FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
+                UUID cpId = cpRec.checkpointId();
+
+                buf.putLong(cpId.getMostSignificantBits());
+                buf.putLong(cpId.getLeastSignificantBits());
+
+                buf.put(walPtr == null ? (byte)0 : 1);
+
+                if (walPtr != null) {
+                    buf.putLong(walPtr.index());
+                    buf.putInt(walPtr.fileOffset());
+                    buf.putInt(walPtr.length());
+                }
+
+                putCacheStates(buf, cpRec.cacheGroupStates());
+
+                buf.put(cpRec.end() ? (byte)1 : 0);
+
+                break;
+
+            case DATA_RECORD:
+                DataRecord dataRec = (DataRecord)record;
+
+                buf.putInt(dataRec.writeEntries().size());
+
+                for (DataEntry dataEntry : dataRec.writeEntries())
+                    putDataEntry(buf, dataEntry);
+
+                break;
+
+            case HEADER_RECORD:
+                buf.putLong(HeaderRecord.MAGIC);
+
+                buf.putInt(((HeaderRecord)record).version());
+
+                break;
+
+            case DATA_PAGE_INSERT_RECORD:
+                DataPageInsertRecord diRec = (DataPageInsertRecord)record;
+
+                buf.putInt(diRec.groupId());
+                buf.putLong(diRec.pageId());
+
+                buf.putShort((short)diRec.payload().length);
+
+                buf.put(diRec.payload());
+
+                break;
+
+            case DATA_PAGE_UPDATE_RECORD:
+                DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
+
+                buf.putInt(uRec.groupId());
+                buf.putLong(uRec.pageId());
+                buf.putInt(uRec.itemId());
+
+                buf.putShort((short)uRec.payload().length);
+
+                buf.put(uRec.payload());
+
+                break;
+
+            case DATA_PAGE_INSERT_FRAGMENT_RECORD:
+                final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
+
+                buf.putInt(difRec.groupId());
+                buf.putLong(difRec.pageId());
+
+                buf.putLong(difRec.lastLink());
+                buf.putInt(difRec.payloadSize());
+                buf.put(difRec.payload());
+
+                break;
+
+            case DATA_PAGE_REMOVE_RECORD:
+                DataPageRemoveRecord drRec = (DataPageRemoveRecord)record;
+
+                buf.putInt(drRec.groupId());
+                buf.putLong(drRec.pageId());
+
+                buf.put((byte)drRec.itemId());
+
+                break;
+
+            case DATA_PAGE_SET_FREE_LIST_PAGE:
+                DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record;
+
+                buf.putInt(freeListRec.groupId());
+                buf.putLong(freeListRec.pageId());
+
+                buf.putLong(freeListRec.freeListPage());
+
+                break;
+
+            case INIT_NEW_PAGE_RECORD:
+                InitNewPageRecord inpRec = (InitNewPageRecord)record;
+
+                buf.putInt(inpRec.groupId());
+                buf.putLong(inpRec.pageId());
+
+                buf.putShort((short)inpRec.ioType());
+                buf.putShort((short)inpRec.ioVersion());
+                buf.putLong(inpRec.newPageId());
+
+                break;
+
+            case BTREE_META_PAGE_INIT_ROOT:
+                MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record;
+
+                buf.putInt(imRec.groupId());
+                buf.putLong(imRec.pageId());
+
+                buf.putLong(imRec.rootId());
+
+                break;
+
+            case BTREE_META_PAGE_INIT_ROOT2:
+                MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record;
+
+                buf.putInt(imRec2.groupId());
+                buf.putLong(imRec2.pageId());
+
+                buf.putLong(imRec2.rootId());
+
+                buf.putShort((short)imRec2.inlineSize());
+                break;
+
+            case BTREE_META_PAGE_ADD_ROOT:
+                MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record;
+
+                buf.putInt(arRec.groupId());
+                buf.putLong(arRec.pageId());
+
+                buf.putLong(arRec.rootId());
+
+                break;
+
+            case BTREE_META_PAGE_CUT_ROOT:
+                MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record;
+
+                buf.putInt(crRec.groupId());
+                buf.putLong(crRec.pageId());
+
+                break;
+
+            case BTREE_INIT_NEW_ROOT:
+                NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
+
+                buf.putInt(riRec.groupId());
+                buf.putLong(riRec.pageId());
+
+                buf.putLong(riRec.rootId());
+                buf.putShort((short)riRec.io().getType());
+                buf.putShort((short)riRec.io().getVersion());
+                buf.putLong(riRec.leftId());
+                buf.putLong(riRec.rightId());
+
+                putRow(buf, riRec.rowBytes());
+
+                break;
+
+            case BTREE_PAGE_RECYCLE:
+                RecycleRecord recRec = (RecycleRecord)record;
+
+                buf.putInt(recRec.groupId());
+                buf.putLong(recRec.pageId());
+
+                buf.putLong(recRec.newPageId());
+
+                break;
+
+            case BTREE_PAGE_INSERT:
+                InsertRecord<?> inRec = (InsertRecord<?>)record;
+
+                buf.putInt(inRec.groupId());
+                buf.putLong(inRec.pageId());
+
+                buf.putShort((short)inRec.io().getType());
+                buf.putShort((short)inRec.io().getVersion());
+                buf.putShort((short)inRec.index());
+                buf.putLong(inRec.rightId());
+
+                putRow(buf, inRec.rowBytes());
+
+                break;
+
+            case BTREE_FIX_LEFTMOST_CHILD:
+                FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record;
+
+                buf.putInt(flRec.groupId());
+                buf.putLong(flRec.pageId());
+
+                buf.putLong(flRec.rightId());
+
+                break;
+
+            case BTREE_FIX_COUNT:
+                FixCountRecord fcRec = (FixCountRecord)record;
+
+                buf.putInt(fcRec.groupId());
+                buf.putLong(fcRec.pageId());
+
+                buf.putShort((short)fcRec.count());
+
+                break;
+
+            case BTREE_PAGE_REPLACE:
+                ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
+
+                buf.putInt(rRec.groupId());
+                buf.putLong(rRec.pageId());
+
+                buf.putShort((short)rRec.io().getType());
+                buf.putShort((short)rRec.io().getVersion());
+                buf.putShort((short)rRec.index());
+
+                putRow(buf, rRec.rowBytes());
+
+                break;
+
+            case BTREE_PAGE_REMOVE:
+                RemoveRecord rmRec = (RemoveRecord)record;
+
+                buf.putInt(rmRec.groupId());
+                buf.putLong(rmRec.pageId());
+
+                buf.putShort((short)rmRec.index());
+                buf.putShort((short)rmRec.count());
+
+                break;
+
+            case BTREE_PAGE_INNER_REPLACE:
+                InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)record;
+
+                buf.putInt(irRec.groupId());
+                buf.putLong(irRec.pageId());
+
+                buf.putShort((short)irRec.destinationIndex());
+                buf.putLong(irRec.sourcePageId());
+                buf.putShort((short)irRec.sourceIndex());
+                buf.putLong(irRec.removeId());
+
+                break;
+
+            case BTREE_FORWARD_PAGE_SPLIT:
+                SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record;
+
+                buf.putInt(sfRec.groupId());
+                buf.putLong(sfRec.pageId());
+
+                buf.putLong(sfRec.forwardId());
+                buf.putShort((short)sfRec.ioType());
+                buf.putShort((short)sfRec.ioVersion());
+                buf.putLong(sfRec.sourcePageId());
+                buf.putShort((short)sfRec.middleIndex());
+                buf.putShort((short)sfRec.count());
+
+                break;
+
+            case BTREE_EXISTING_PAGE_SPLIT:
+                SplitExistingPageRecord seRec = (SplitExistingPageRecord)record;
+
+                buf.putInt(seRec.groupId());
+                buf.putLong(seRec.pageId());
+
+                buf.putShort((short)seRec.middleIndex());
+                buf.putLong(seRec.forwardId());
+
+                break;
+
+            case BTREE_PAGE_MERGE:
+                MergeRecord<?> mRec = (MergeRecord<?>)record;
+
+                buf.putInt(mRec.groupId());
+                buf.putLong(mRec.pageId());
+
+                buf.putLong(mRec.parentId());
+                buf.putShort((short)mRec.parentIndex());
+                buf.putLong(mRec.rightId());
+                buf.put((byte)(mRec.isEmptyBranch() ? 1 : 0));
+
+                break;
+
+            case PAGES_LIST_SET_NEXT:
+                PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record;
+
+                buf.putInt(plNextRec.groupId());
+                buf.putLong(plNextRec.pageId());
+
+                buf.putLong(plNextRec.nextPageId());
+
+                break;
+
+            case PAGES_LIST_SET_PREVIOUS:
+                PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record;
+
+                buf.putInt(plPrevRec.groupId());
+                buf.putLong(plPrevRec.pageId());
+
+                buf.putLong(plPrevRec.previousPageId());
+
+                break;
+
+            case PAGES_LIST_INIT_NEW_PAGE:
+                PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record;
+
+                buf.putInt(plNewRec.groupId());
+                buf.putLong(plNewRec.pageId());
+                buf.putInt(plNewRec.ioType());
+                buf.putInt(plNewRec.ioVersion());
+                buf.putLong(plNewRec.newPageId());
+
+                buf.putLong(plNewRec.previousPageId());
+                buf.putLong(plNewRec.dataPageId());
+
+                break;
+
+            case PAGES_LIST_ADD_PAGE:
+                PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record;
+
+                buf.putInt(plAddRec.groupId());
+                buf.putLong(plAddRec.pageId());
+
+                buf.putLong(plAddRec.dataPageId());
+
+                break;
+
+            case PAGES_LIST_REMOVE_PAGE:
+                PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record;
+
+                buf.putInt(plRmvRec.groupId());
+                buf.putLong(plRmvRec.pageId());
+
+                buf.putLong(plRmvRec.removedPageId());
+
+                break;
+
+            case BTREE_FIX_REMOVE_ID:
+                FixRemoveId frRec = (FixRemoveId)record;
+
+                buf.putInt(frRec.groupId());
+                buf.putLong(frRec.pageId());
+
+                buf.putLong(frRec.removeId());
+
+                break;
+
+            case TRACKING_PAGE_DELTA:
+                TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record;
+
+                buf.putInt(tpDelta.groupId());
+                buf.putLong(tpDelta.pageId());
+
+                buf.putLong(tpDelta.pageIdToMark());
+                buf.putLong(tpDelta.nextSnapshotId());
+                buf.putLong(tpDelta.lastSuccessfulSnapshotId());
+
+                break;
+
+            case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
+                MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record;
+
+                buf.putInt(mpUpdateNextSnapshotId.groupId());
+                buf.putLong(mpUpdateNextSnapshotId.pageId());
+
+                buf.putLong(mpUpdateNextSnapshotId.nextSnapshotId());
+
+                break;
+
+            case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+                MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId =
+                        (MetaPageUpdateLastSuccessfulFullSnapshotId)record;
+
+                buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId());
+                buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId());
+
+                buf.putLong(mpUpdateLastSuccFullSnapshotId.lastSuccessfulFullSnapshotId());
+
+                break;
+
+            case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
+                MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId =
+                        (MetaPageUpdateLastSuccessfulSnapshotId)record;
+
+                buf.putInt(mpUpdateLastSuccSnapshotId.groupId());
+                buf.putLong(mpUpdateLastSuccSnapshotId.pageId());
+
+                buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotId());
+                buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotTag());
+
+                break;
+
+            case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
+                MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx =
+                        (MetaPageUpdateLastAllocatedIndex) record;
+
+                buf.putInt(mpUpdateLastAllocatedIdx.groupId());
+                buf.putLong(mpUpdateLastAllocatedIdx.pageId());
+
+                buf.putInt(mpUpdateLastAllocatedIdx.lastAllocatedIndex());
+
+                break;
+
+            case PART_META_UPDATE_STATE:
+                PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record;
+
+                buf.putInt(partMetaStateRecord.groupId());
+
+                buf.putInt(partMetaStateRecord.partitionId());
+
+                buf.put(partMetaStateRecord.state());
+
+                buf.putLong(partMetaStateRecord.updateCounter());
+
+                break;
+
+            case PAGE_LIST_META_RESET_COUNT_RECORD:
+                PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record;
+
+                buf.putInt(pageListMetaResetCntRecord.groupId());
+                buf.putLong(pageListMetaResetCntRecord.pageId());
+
+                break;
+
+            case TX_RECORD:
+                txRecordSerializer.writeTxRecord((TxRecord)record, buf);
+
+                break;
+
+            case SWITCH_SEGMENT_RECORD:
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Type: " + record.type());
+        }
+    }
+
+    /**
+     * @param buf Buffer to write to.
+     * @param entry Data entry.
+     */
+    private static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException {
+        buf.putInt(entry.cacheId());
+
+        if (!entry.key().putValue(buf))
+            throw new AssertionError();
+
+        if (entry.value() == null)
+            buf.putInt(-1);
+        else if (!entry.value().putValue(buf))
+            throw new AssertionError();
+
+        buf.put((byte)entry.op().ordinal());
+
+        putVersion(buf, entry.nearXidVersion(), true);
+        putVersion(buf, entry.writeVersion(), false);
+
+        buf.putInt(entry.partitionId());
+        buf.putLong(entry.partitionCounter());
+        buf.putLong(entry.expireTime());
+    }
+
+    /**
+     * @param states Cache states.
+     */
+    private static void putCacheStates(ByteBuffer buf, Map<Integer, CacheState> states) {
+        buf.putShort((short)states.size());
+
+        for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
+            buf.putInt(entry.getKey());
+
+            CacheState state = entry.getValue();
+
+            // Need 2 bytes for the number of partitions.
+            buf.putShort((short)state.size());
+
+            for (int i = 0; i < state.size(); i++) {
+                buf.putShort((short)state.partitionByIndex(i));
+
+                buf.putLong(state.partitionSizeByIndex(i));
+                buf.putLong(state.partitionCounterByIndex(i));
+            }
+        }
+    }
+
+    /**
+     * @param buf Buffer.
+     * @param ver Version to write.
+     * @param allowNull Is {@code null}version allowed.
+     */
+    private static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
+        CacheVersionIO.write(buf, ver, allowNull);
+    }
+
+    /**
+     * @param buf Buffer.
+     * @param rowBytes Row bytes.
+     */
+    @SuppressWarnings("unchecked")
+    private static void putRow(ByteBuffer buf, byte[] rowBytes) {
+        assert rowBytes.length > 0;
+
+        buf.put(rowBytes);
+    }
+
+    /**
+     * @param in Input to read from.
+     * @return Read entry.
+     */
+    private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+        int cacheId = in.readInt();
+
+        int keySize = in.readInt();
+        byte keyType = in.readByte();
+        byte[] keyBytes = new byte[keySize];
+        in.readFully(keyBytes);
+
+        int valSize = in.readInt();
+
+        byte valType = 0;
+        byte[] valBytes = null;
+
+        if (valSize >= 0) {
+            valType = in.readByte();
+            valBytes = new byte[valSize];
+            in.readFully(valBytes);
+        }
+
+        byte ord = in.readByte();
+
+        GridCacheOperation op = GridCacheOperation.fromOrdinal(ord & 0xFF);
+
+        GridCacheVersion nearXidVer = readVersion(in, true);
+        GridCacheVersion writeVer = readVersion(in, false);
+
+        int partId = in.readInt();
+        long partCntr = in.readLong();
+        long expireTime = in.readLong();
+
+        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+        if (cacheCtx != null) {
+            CacheObjectContext coCtx = cacheCtx.cacheObjectContext();
+
+            KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes);
+            CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null;
+
+            return new DataEntry(
+                    cacheId,
+                    key,
+                    val,
+                    op,
+                    nearXidVer,
+                    writeVer,
+                    expireTime,
+                    partId,
+                    partCntr
+            );
+        }
+        else
+            return new LazyDataEntry(
+                    cctx,
+                    cacheId,
+                    keyType,
+                    keyBytes,
+                    valType,
+                    valBytes,
+                    op,
+                    nearXidVer,
+                    writeVer,
+                    expireTime,
+                    partId,
+                    partCntr);
+    }
+
+    /**
+     * @param buf Buffer to read from.
+     * @return Read map.
+     */
+    private Map<Integer, CacheState> readPartitionStates(DataInput buf) throws IOException {
+        int caches = buf.readShort() & 0xFFFF;
+
+        if (caches == 0)
+            return Collections.emptyMap();
+
+        Map<Integer, CacheState> states = new HashMap<>(caches, 1.0f);
+
+        for (int i = 0; i < caches; i++) {
+            int cacheId = buf.readInt();
+
+            int parts = buf.readShort() & 0xFFFF;
+
+            CacheState state = new CacheState(parts);
+
+            for (int p = 0; p < parts; p++) {
+                int partId = buf.readShort() & 0xFFFF;
+                long size = buf.readLong();
+                long partCntr = buf.readLong();
+
+                state.addPartitionState(partId, size, partCntr);
+            }
+
+            states.put(cacheId, state);
+        }
+
+        return states;
+    }
+
+    /**
+     * Changes the buffer position by the number of read bytes.
+     *
+     * @param in Data input to read from.
+     * @param allowNull Is {@code null}version allowed.
+     * @return Read cache version.
+     */
+    private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
+        // To be able to read serialization protocol version.
+        in.ensure(1);
+
+        try {
+            int size = CacheVersionIO.readSize(in.buffer(), allowNull);
+
+            in.ensure(size);
+
+            return CacheVersionIO.read(in.buffer(), allowNull);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * @param dataRec Data record to serialize.
+     * @return Full data record size.
+     * @throws IgniteCheckedException If failed to obtain the length of one of the entries.
+     */
+    private int dataSize(DataRecord dataRec) throws IgniteCheckedException {
+        int sz = 0;
+
+        for (DataEntry entry : dataRec.writeEntries())
+            sz += entrySize(entry);
+
+        return sz;
+    }
+
+    /**
+     * @param entry Entry to get size for.
+     * @return Entry size.
+     * @throws IgniteCheckedException If failed to get key or value bytes length.
+     */
+    private int entrySize(DataEntry entry) throws IgniteCheckedException {
+        GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId());
+        CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+        return
+            /*cache ID*/4 +
+            /*key*/entry.key().valueBytesLength(coCtx) +
+            /*value*/(entry.value() == null ? 4 : entry.value().valueBytesLength(coCtx)) +
+            /*op*/1 +
+            /*near xid ver*/CacheVersionIO.size(entry.nearXidVersion(), true) +
+            /*write ver*/CacheVersionIO.size(entry.writeVersion(), false) +
+            /*part ID*/4 +
+            /*expire Time*/8 +
+            /*part cnt*/8;
+    }
+
+    /**
+     * @param states Partition states.
+     * @return Size required to write partition states.
+     */
+    private int cacheStatesSize(Map<Integer, CacheState> states) {
+        // Need 4 bytes for the number of caches.
+        int size = 2;
+
+        for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
+            // Cache ID.
+            size += 4;
+
+            // Need 2 bytes for the number of partitions.
+            size += 2;
+
+            CacheState state = entry.getValue();
+
+            // 2 bytes partition ID, size and counter per partition.
+            size += 18 * state.size();
+        }
+
+        return size;
+    }
+
+}