You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/26 07:33:39 UTC

[06/12] ignite git commit: ignite-6339 WAL write operations are optimized and file IO operations are non-interruptible from user thread now

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
new file mode 100644
index 0000000..a0209f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import sun.nio.ch.DirectBuffer;
+
+import static java.nio.ByteBuffer.allocate;
+import static java.nio.ByteBuffer.allocateDirect;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.MAPPED;
+
+/**
+ * Segmented ring byte buffer that represents multi producer/single consumer queue that can be used by multiple writer
+ * threads and one reader thread.
+ */
+public class SegmentedRingByteBuffer {
+    /** Open mask. */
+    private static final long OPEN_MASK = 0x7FFFFFFFFFFFFFFFL;
+
+    /** Close mask. */
+    private static final long CLOSE_MASK = 0x8000000000000000L;
+
+    /** Tail field atomic updater. */
+    private static final AtomicLongFieldUpdater<SegmentedRingByteBuffer> TAIL_UPD =
+        AtomicLongFieldUpdater.newUpdater(SegmentedRingByteBuffer.class, "tail");
+
+    /** Producers count field atomic updater. */
+    private static final AtomicIntegerFieldUpdater<SegmentedRingByteBuffer> PRODUCERS_CNT_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(SegmentedRingByteBuffer.class, "producersCnt");
+
+    /** Capacity. */
+    private final int cap;
+
+    /** Direct. */
+    private final BufferMode mode;
+
+    /** Buffer. */
+    public final ByteBuffer buf;
+
+    /** Max segment size. */
+    private final long maxSegmentSize;
+
+    /** Head. */
+    private volatile long head;
+
+    /** Tail. */
+    private volatile long tail;
+
+    /**
+     * Producers count. Uses by consumer in order to wait for ending of data writing by all producers.
+     */
+    private volatile int producersCnt;
+
+    /**
+     * Wait for consumer flag. Prevents producers from writing data to the ring buffer while consumer waiting for finish
+     * of all already writing producers.
+     */
+    private volatile boolean waitForConsumer;
+
+    /** Metrics. */
+    private final DataStorageMetricsImpl metrics;
+
+    /**
+     * Creates ring buffer with given capacity.
+     *
+     * @param cap Buffer's capacity.
+     * @param maxSegmentSize Max segment size.
+     * @param mode Buffer mode.
+     */
+    public SegmentedRingByteBuffer(int cap, long maxSegmentSize, BufferMode mode) {
+        this(cap, maxSegmentSize, mode == DIRECT ? allocateDirect(cap) : allocate(cap), mode, null);
+    }
+
+    /**
+     * Creates ring buffer with given capacity.
+     *
+     * @param cap Buffer's capacity.
+     * @param maxSegmentSize Max segment size.
+     * @param mode Buffer mode.
+     * @param metrics Metrics.
+     */
+    public SegmentedRingByteBuffer(int cap, long maxSegmentSize, BufferMode mode, DataStorageMetricsImpl metrics) {
+        this(cap, maxSegmentSize, mode == DIRECT ? allocateDirect(cap) : allocate(cap), mode, metrics);
+    }
+
+    /**
+     * Creates ring buffer with given capacity which mapped to file.
+     *
+     * @param buf {@link MappedByteBuffer} instance.
+     * @param metrics Metrics.
+     */
+    public SegmentedRingByteBuffer(MappedByteBuffer buf, DataStorageMetricsImpl metrics) {
+        this(buf.capacity(), buf.capacity(), buf, MAPPED, metrics);
+    }
+
+    /**
+     * @param cap Capacity.
+     * @param maxSegmentSize Max segment size.
+     * @param buf Buffer.
+     * @param mode Mode.
+     * @param metrics Metrics.
+     */
+    private SegmentedRingByteBuffer(
+        int cap,
+        long maxSegmentSize,
+        ByteBuffer buf,
+        BufferMode mode,
+        DataStorageMetricsImpl metrics
+    ) {
+        this.cap = cap;
+        this.mode = mode;
+        this.buf = buf;
+        this.buf.order(ByteOrder.nativeOrder());
+        this.maxSegmentSize = maxSegmentSize;
+        this.metrics = metrics;
+    }
+
+    /**
+     * Performs initialization of ring buffer state.
+     *
+     * @param pos Position.
+     */
+    public void init(long pos) {
+        head = pos;
+        tail = pos;
+    }
+
+    /**
+     * Returns buffer mode.
+     *
+     * @return Buffer mode.
+     */
+    public BufferMode mode() {
+        return mode;
+    }
+
+    /**
+     * Returns actual buffer tail.
+     *
+     * @return Buffer tail.
+     */
+    public long tail() {
+        return tail & SegmentedRingByteBuffer.OPEN_MASK;
+    }
+
+    /**
+     * Reserves {@code size} bytes in {@code SegmentedRingByteBuffer} and returns instance of {@link WriteSegment}
+     * class that points to wrapped {@link ByteBuffer} instance with corresponding capacity. This {@link ByteBuffer}
+     * instance should be used only for data writing to {@link SegmentedRingByteBuffer}.
+     * <p>
+     * Returned result can be {@code null} in case of requested amount of bytes greater then available space
+     * in {@code SegmentedRingByteBuffer}. Also {@link WriteSegment#buffer()} can return {@code null} in case of
+     * {@link #maxSegmentSize} value is exceeded. In this case buffer will be closed in order to prevent any
+     * concurrent threads from trying of reserve new segment.
+     * <p>
+     * This method can be invoked by many producer threads and each producer will get own {@link ByteBuffer} instance
+     * that mapped to own {@link SegmentedRingByteBuffer} slice.
+     * <p>
+     * Once the data has been written into the {@link ByteBuffer} client code must notify
+     * {@code SegmentedRingByteBuffer} instance using {@link WriteSegment#release()} method in order to provide
+     * possibility to consumer get data for reading.
+     *
+     * @param size Amount of bytes for reserve.
+     * @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
+     * {@code null} if buffer space is not enough.
+     */
+    public WriteSegment offer(int size) {
+        return offer0(size, false);
+    }
+
+    /**
+     * Behaves like {@link #offer(int)} but in safe manner: there are no any concurrent threads and buffer in
+     * closed state.
+     *
+     * @param size Amount of bytes for reserve.
+     * @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
+     * {@code null} if buffer space is not enough.
+     */
+    public WriteSegment offerSafe(int size) {
+        return offer0(size, true);
+    }
+
+    /**
+     * @param size Amount of bytes for reserve.
+     * @param safe Safe ьщву.
+     */
+    private WriteSegment offer0(int size, boolean safe) {
+        if (size > cap)
+            throw new IllegalArgumentException("Record is too long [capacity=" + cap + ", size=" + size + ']');
+
+        for (;;) {
+            if (!waitForConsumer) {
+                int cur = producersCnt;
+
+                if (cur >= 0 && PRODUCERS_CNT_UPD.compareAndSet(this, cur, cur + 1))
+                    break;
+            }
+        }
+
+        for (;;) {
+            long currTail = tail;
+
+            assert !safe || currTail < 0 : "Unsafe usage of segment ring byte buffer currTail=" + currTail;
+
+            if (currTail < 0) {
+                if (safe)
+                    currTail &= SegmentedRingByteBuffer.OPEN_MASK;
+                else
+                    return new WriteSegment(null, -1);
+            }
+
+            long head0 = head;
+
+            long currTailIdx = toIndex(currTail);
+
+            boolean fitsSeg = currTail + size <= maxSegmentSize;
+
+            long newTail = fitsSeg ? currTail + size : currTail;
+
+            if (head0 < newTail - cap) { // Not enough space.
+                PRODUCERS_CNT_UPD.decrementAndGet(this);
+
+                return null;
+            }
+            else {
+                // If safe we should keep buffer closed.
+                long tail0 = fitsSeg ? (safe ? newTail | CLOSE_MASK : newTail) : newTail | CLOSE_MASK;
+
+                boolean upd = TAIL_UPD.compareAndSet(this, safe ? currTail | CLOSE_MASK : currTail, tail0);
+
+                assert !safe || upd : "Unsafe usage of segment ring byte buffer";
+
+                if (upd) {
+                    if (!fitsSeg)
+                        return new WriteSegment(null, -1);
+
+                    boolean wrap = cap - currTailIdx < size;
+
+                    if (wrap) {
+                        long newTailIdx = toIndex(newTail);
+
+                        return new WriteSegment(currTail, newTail, newTailIdx == 0 ? newTail : currTail);
+                    }
+                    else {
+                        ByteBuffer slice = slice((int)toIndex(newTail - size), size, false);
+
+                        return new WriteSegment(slice, newTail);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Closes the buffer.
+     */
+    public void close() {
+        for (;;) {
+            long currTail = tail;
+
+            if (currTail < 0)
+                return;
+
+            if(TAIL_UPD.compareAndSet(this, currTail, currTail | CLOSE_MASK))
+                return;
+        }
+    }
+
+    /**
+     * Retrieves list of {@link ReadSegment} instances that point to {@link ByteBuffer} that contains all data available
+     * for reading from {@link SegmentedRingByteBuffer} or {@code null} if there are no available data for reading.
+     * <p>
+     * This method can be invoked only by one consumer thread.
+     * <p>
+     * Once the data has been read from the returned {@link ReadSegment} client code must notify
+     * {@link SegmentedRingByteBuffer} instance using {@link ReadSegment#release()} method in order to release occupied
+     * space in the {@link SegmentedRingByteBuffer} and make it available for writing.
+     *
+     * @return List of {@code ReadSegment} instances with all available data for reading or {@code null} if
+     * there are no available data.
+     */
+    public List<ReadSegment> poll() {
+        return poll(-1);
+    }
+
+    /**
+     * Retrieves list of {@link ReadSegment} instances that point to {@link ByteBuffer} that contains data
+     * available for reading from {@link SegmentedRingByteBuffer} limited by {@code pos} parameter or {@code null}
+     * if there are no available data for reading.
+     * <p>
+     * This method can be invoked only by one consumer thread.
+     * <p>
+     * Once the data has been read from the returned {@link ReadSegment} client code must notify
+     * {@link SegmentedRingByteBuffer} instance using {@link ReadSegment#release()} method in order to release occupied
+     * space in the {@link SegmentedRingByteBuffer} and make it available for writing.
+     *
+     * @param pos End position in buffer.
+     * @return List of {@code ReadSegment} instances with all available data for reading or {@code null} if
+     * there are no available data.
+     */
+    public List<ReadSegment> poll(long pos) {
+        waitForConsumer = true;
+
+        int spins = 0;
+
+        for (;;) {
+            if (PRODUCERS_CNT_UPD.compareAndSet(this, 0, -1))
+                break;
+
+            spins++;
+        }
+
+        if (metrics != null && metrics.metricsEnabled())
+            metrics.onBuffPollSpin(spins);
+
+        long head = this.head;
+
+        long tail = this.tail & OPEN_MASK;
+
+        producersCnt = 0;
+
+        waitForConsumer = false;
+
+        // There are no data for reading or all data up to given position were read.
+        if (tail <= head || (pos >=0 && head > pos))
+            return null;
+
+        int headIdx = (int)toIndex(head);
+
+        int tailIdx = (int)toIndex(tail);
+
+        boolean wrapped = tailIdx <= headIdx;
+
+        if (wrapped && tailIdx != 0) {
+            List<ReadSegment> lst = new ArrayList<>(2);
+
+            int lim = cap - headIdx;
+
+            lst.add(new ReadSegment(slice(headIdx, lim, true), head, head + lim));
+
+            lst.add(new ReadSegment(slice(0, tailIdx, true), head + lim, tail));
+
+            return lst;
+        }
+        else
+            return Collections.singletonList(new ReadSegment(slice(headIdx, (int)(tail - head), true), head, tail));
+    }
+
+    /**
+     * Frees allocated memory in case of direct byte buffer.
+     */
+    public void free() {
+        if (mode == DIRECT || mode == MAPPED)
+            ((DirectBuffer)buf).cleaner().clean();
+    }
+
+    /**
+     * Resets the state of the buffer and returns new instance but with the same underlying buffer.
+     */
+    public SegmentedRingByteBuffer reset() {
+        return new SegmentedRingByteBuffer(buf.capacity(), maxSegmentSize, buf, mode, metrics);
+    }
+
+    /**
+     * @param off Offset.
+     * @param len Length.
+     * @param readOnly Read only.
+     */
+    private ByteBuffer slice(int off, int len, boolean readOnly) {
+        ByteBuffer bb = readOnly ? buf.asReadOnlyBuffer() : buf.duplicate();
+
+        bb.order(ByteOrder.nativeOrder());
+        bb.limit(off + len);
+        bb.position(off);
+
+        return bb;
+    }
+
+    /**
+     * @param globalIdx Global index of ring buffer.
+     * @return Index of byte array.
+     */
+    private long toIndex(long globalIdx) {
+        return globalIdx % cap;
+    }
+
+    /**
+     * @param src Source.
+     * @param srcPos Source pos.
+     * @param dest Destination.
+     * @param destPos Destination pos.
+     * @param len Length.
+     */
+    private void copy(ByteBuffer src, int srcPos, ByteBuffer dest, int destPos, int len) {
+        assert mode != MAPPED;
+
+        if (buf.isDirect()) {
+            ByteBuffer src0 = src.duplicate();
+            src0.limit(srcPos + len);
+            src0.position(srcPos);
+
+            ByteBuffer dest0 = dest.duplicate();
+            dest0.limit(destPos + len);
+            dest0.position(destPos);
+
+            dest0.put(src0);
+        }
+        else
+            System.arraycopy(src.array(), srcPos, buf.array(), destPos, len);
+    }
+
+    /**
+     *
+     */
+    private abstract class Segment {
+        /** Buffer. */
+        protected final ByteBuffer seg;
+
+        /** Pos. */
+        protected final long pos;
+
+        /**
+         * @param seg Seg.
+         * @param pos Pos.
+         */
+        protected Segment(ByteBuffer seg, long pos) {
+            this.seg = seg;
+            this.pos = pos;
+        }
+
+        /**
+         * Releases segment.
+         */
+        abstract public void release();
+
+        /**
+         * Returns byte buffer.
+         *
+         * @return Byte buffer.
+         */
+        abstract public ByteBuffer buffer();
+
+        /**
+         * Returns position.
+         *
+         * @return Position.
+         */
+        public long position() {
+            return pos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Segment.class, this);
+        }
+    }
+
+    /**
+     * Segment available for data writing.
+     */
+    public class WriteSegment extends Segment {
+        /** Current tail. */
+        private final long currTail;
+
+        /** Wrap point. */
+        private final long wrapPnt;
+
+        /**
+         * @param currTail Current tail.
+         * @param newTail New tail.
+         * @param wrapPnt Wrap point.
+         */
+        private WriteSegment(long currTail, long newTail, long wrapPnt) {
+            super(allocate((int)(newTail - currTail)), newTail);
+
+            this.seg.order(ByteOrder.nativeOrder());
+            this.currTail = currTail;
+            this.wrapPnt = wrapPnt;
+        }
+
+        /**
+         * @param seg Seg.
+         * @param pos Pos.
+         */
+        private WriteSegment(ByteBuffer seg, long pos) {
+            super(seg, pos);
+
+            this.currTail = -1;
+            this.wrapPnt = -1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer buffer() {
+            return seg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void release() {
+            if (wrapPnt > -1) {
+                int pos = (int)toIndex(currTail);
+
+                int len = cap - pos;
+
+                copy(seg, 0, buf, pos, len);
+
+                copy(seg, len, buf, 0, seg.array().length - len);
+            }
+
+            assert producersCnt >= 0;
+
+            PRODUCERS_CNT_UPD.decrementAndGet(SegmentedRingByteBuffer.this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteSegment.class, this, "super", super.toString());
+        }
+    }
+
+    /**
+     * Segment available for data reading.
+     */
+    public class ReadSegment extends Segment {
+        /** New head. */
+        private final long newHead;
+
+        /**
+         * @param seg Seg.
+         * @param pos Pos.
+         * @param newHead New head.
+         */
+        private ReadSegment(ByteBuffer seg, long pos, long newHead) {
+            super(seg, pos);
+
+            this.newHead = newHead;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void release() {
+            if (newHead >= 0)
+                head = newHead;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer buffer() {
+            return seg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReadSegment.class, this, "super", super.toString());
+        }
+    }
+
+    /**
+     * Buffer mode.
+     */
+    public enum BufferMode {
+        /** Byte buffer on-heap. */
+        ONHEAP,
+
+        /** Direct byte buffer off-heap */
+        DIRECT,
+
+        /** Byte buffer mapped to file. */
+        MAPPED
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
index 35c94a8..c26a0c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
@@ -25,10 +25,10 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  */
 public class HeaderRecord extends WALRecord {
     /** Magic of regular WAL segment. */
-    public static final long REGULAR_MAGIC = 0xB0D045A_CE7ED045AL;
+    public static final long REGULAR_MAGIC = 0xB0D045AC_E7ED045AL;
 
     /** Magic of WAL segment with skipped physical records. */
-    public static final long COMPACTED_MAGIC = 0x4E07AE0_E573A694EL;
+    public static final long COMPACTED_MAGIC = 0x4E07AE0E_573A694EL;
 
     /** Serializer version */
     private final int ver;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index d478917..12992a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -92,8 +92,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
-import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
-
 /**
  * Record data V1 serializer.
  */
@@ -314,14 +312,14 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 long lsb = in.readLong();
                 boolean hasPtr = in.readByte() != 0;
                 int idx = hasPtr ? in.readInt() : 0;
-                int offset = hasPtr ? in.readInt() : 0;
+                int off = hasPtr ? in.readInt() : 0;
                 int len = hasPtr ? in.readInt() : 0;
 
                 Map<Integer, CacheState> states = readPartitionStates(in);
 
                 boolean end = in.readByte() != 0;
 
-                FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null;
+                FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, off, len) : null;
 
                 CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end);
 
@@ -789,9 +787,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
                 state = in.readByte();
 
-                long updateCounter = in.readLong();
+                long updateCntr = in.readLong();
 
-                res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter);
+                res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCntr);
 
                 break;
 
@@ -818,10 +816,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
     }
 
     /** {@inheritDoc} */
-    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
-        switch (record.type()) {
+    @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
+        switch (rec.type()) {
             case PAGE_RECORD:
-                PageSnapshot snap = (PageSnapshot)record;
+                PageSnapshot snap = (PageSnapshot)rec;
 
                 buf.putInt(snap.fullPageId().groupId());
                 buf.putLong(snap.fullPageId().pageId());
@@ -830,14 +828,14 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case MEMORY_RECOVERY:
-                MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
+                MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)rec;
 
                 buf.putLong(memoryRecoveryRecord.time());
 
                 break;
 
             case PARTITION_DESTROY:
-                PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record;
+                PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)rec;
 
                 buf.putInt(partDestroy.groupId());
                 buf.putInt(partDestroy.partitionId());
@@ -845,7 +843,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case META_PAGE_INIT:
-                MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record;
+                MetaPageInitRecord updRootsRec = (MetaPageInitRecord)rec;
 
                 buf.putInt(updRootsRec.groupId());
                 buf.putLong(updRootsRec.pageId());
@@ -858,7 +856,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PARTITION_META_PAGE_UPDATE_COUNTERS:
-                MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record;
+                MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)rec;
 
                 buf.putInt(partDataRec.groupId());
                 buf.putLong(partDataRec.pageId());
@@ -873,7 +871,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case CHECKPOINT_RECORD:
-                CheckpointRecord cpRec = (CheckpointRecord)record;
+                CheckpointRecord cpRec = (CheckpointRecord)rec;
 
                 assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
                         "Invalid WAL record: " + cpRec;
@@ -899,7 +897,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_RECORD:
-                DataRecord dataRec = (DataRecord)record;
+                DataRecord dataRec = (DataRecord)rec;
 
                 buf.putInt(dataRec.writeEntries().size());
 
@@ -908,15 +906,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
                 break;
 
-            case HEADER_RECORD:
-                buf.putLong(HeaderRecord.REGULAR_MAGIC);
-
-                buf.putInt(((HeaderRecord)record).version());
-
-                break;
-
             case DATA_PAGE_INSERT_RECORD:
-                DataPageInsertRecord diRec = (DataPageInsertRecord)record;
+                DataPageInsertRecord diRec = (DataPageInsertRecord)rec;
 
                 buf.putInt(diRec.groupId());
                 buf.putLong(diRec.pageId());
@@ -928,7 +919,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_PAGE_UPDATE_RECORD:
-                DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
+                DataPageUpdateRecord uRec = (DataPageUpdateRecord)rec;
 
                 buf.putInt(uRec.groupId());
                 buf.putLong(uRec.pageId());
@@ -941,7 +932,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_PAGE_INSERT_FRAGMENT_RECORD:
-                final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
+                final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)rec;
 
                 buf.putInt(difRec.groupId());
                 buf.putLong(difRec.pageId());
@@ -953,7 +944,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_PAGE_REMOVE_RECORD:
-                DataPageRemoveRecord drRec = (DataPageRemoveRecord)record;
+                DataPageRemoveRecord drRec = (DataPageRemoveRecord)rec;
 
                 buf.putInt(drRec.groupId());
                 buf.putLong(drRec.pageId());
@@ -963,7 +954,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_PAGE_SET_FREE_LIST_PAGE:
-                DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record;
+                DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)rec;
 
                 buf.putInt(freeListRec.groupId());
                 buf.putLong(freeListRec.pageId());
@@ -973,7 +964,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case INIT_NEW_PAGE_RECORD:
-                InitNewPageRecord inpRec = (InitNewPageRecord)record;
+                InitNewPageRecord inpRec = (InitNewPageRecord)rec;
 
                 buf.putInt(inpRec.groupId());
                 buf.putLong(inpRec.pageId());
@@ -985,7 +976,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_META_PAGE_INIT_ROOT:
-                MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record;
+                MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)rec;
 
                 buf.putInt(imRec.groupId());
                 buf.putLong(imRec.pageId());
@@ -995,7 +986,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_META_PAGE_INIT_ROOT2:
-                MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record;
+                MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)rec;
 
                 buf.putInt(imRec2.groupId());
                 buf.putLong(imRec2.pageId());
@@ -1006,7 +997,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_META_PAGE_ADD_ROOT:
-                MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record;
+                MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)rec;
 
                 buf.putInt(arRec.groupId());
                 buf.putLong(arRec.pageId());
@@ -1016,7 +1007,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_META_PAGE_CUT_ROOT:
-                MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record;
+                MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)rec;
 
                 buf.putInt(crRec.groupId());
                 buf.putLong(crRec.pageId());
@@ -1024,7 +1015,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_INIT_NEW_ROOT:
-                NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
+                NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)rec;
 
                 buf.putInt(riRec.groupId());
                 buf.putLong(riRec.pageId());
@@ -1040,7 +1031,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_PAGE_RECYCLE:
-                RecycleRecord recRec = (RecycleRecord)record;
+                RecycleRecord recRec = (RecycleRecord)rec;
 
                 buf.putInt(recRec.groupId());
                 buf.putLong(recRec.pageId());
@@ -1050,7 +1041,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_PAGE_INSERT:
-                InsertRecord<?> inRec = (InsertRecord<?>)record;
+                InsertRecord<?> inRec = (InsertRecord<?>)rec;
 
                 buf.putInt(inRec.groupId());
                 buf.putLong(inRec.pageId());
@@ -1065,7 +1056,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_FIX_LEFTMOST_CHILD:
-                FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record;
+                FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)rec;
 
                 buf.putInt(flRec.groupId());
                 buf.putLong(flRec.pageId());
@@ -1075,7 +1066,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_FIX_COUNT:
-                FixCountRecord fcRec = (FixCountRecord)record;
+                FixCountRecord fcRec = (FixCountRecord)rec;
 
                 buf.putInt(fcRec.groupId());
                 buf.putLong(fcRec.pageId());
@@ -1085,7 +1076,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_PAGE_REPLACE:
-                ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
+                ReplaceRecord<?> rRec = (ReplaceRecord<?>)rec;
 
                 buf.putInt(rRec.groupId());
                 buf.putLong(rRec.pageId());
@@ -1099,7 +1090,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_PAGE_REMOVE:
-                RemoveRecord rmRec = (RemoveRecord)record;
+                RemoveRecord rmRec = (RemoveRecord)rec;
 
                 buf.putInt(rmRec.groupId());
                 buf.putLong(rmRec.pageId());
@@ -1110,7 +1101,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_PAGE_INNER_REPLACE:
-                InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)record;
+                InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)rec;
 
                 buf.putInt(irRec.groupId());
                 buf.putLong(irRec.pageId());
@@ -1123,7 +1114,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_FORWARD_PAGE_SPLIT:
-                SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record;
+                SplitForwardPageRecord sfRec = (SplitForwardPageRecord)rec;
 
                 buf.putInt(sfRec.groupId());
                 buf.putLong(sfRec.pageId());
@@ -1138,7 +1129,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_EXISTING_PAGE_SPLIT:
-                SplitExistingPageRecord seRec = (SplitExistingPageRecord)record;
+                SplitExistingPageRecord seRec = (SplitExistingPageRecord)rec;
 
                 buf.putInt(seRec.groupId());
                 buf.putLong(seRec.pageId());
@@ -1149,7 +1140,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_PAGE_MERGE:
-                MergeRecord<?> mRec = (MergeRecord<?>)record;
+                MergeRecord<?> mRec = (MergeRecord<?>)rec;
 
                 buf.putInt(mRec.groupId());
                 buf.putLong(mRec.pageId());
@@ -1162,7 +1153,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PAGES_LIST_SET_NEXT:
-                PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record;
+                PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)rec;
 
                 buf.putInt(plNextRec.groupId());
                 buf.putLong(plNextRec.pageId());
@@ -1172,7 +1163,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PAGES_LIST_SET_PREVIOUS:
-                PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record;
+                PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)rec;
 
                 buf.putInt(plPrevRec.groupId());
                 buf.putLong(plPrevRec.pageId());
@@ -1182,7 +1173,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PAGES_LIST_INIT_NEW_PAGE:
-                PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record;
+                PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)rec;
 
                 buf.putInt(plNewRec.groupId());
                 buf.putLong(plNewRec.pageId());
@@ -1196,7 +1187,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PAGES_LIST_ADD_PAGE:
-                PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record;
+                PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)rec;
 
                 buf.putInt(plAddRec.groupId());
                 buf.putLong(plAddRec.pageId());
@@ -1206,7 +1197,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PAGES_LIST_REMOVE_PAGE:
-                PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record;
+                PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)rec;
 
                 buf.putInt(plRmvRec.groupId());
                 buf.putLong(plRmvRec.pageId());
@@ -1216,7 +1207,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case BTREE_FIX_REMOVE_ID:
-                FixRemoveId frRec = (FixRemoveId)record;
+                FixRemoveId frRec = (FixRemoveId)rec;
 
                 buf.putInt(frRec.groupId());
                 buf.putLong(frRec.pageId());
@@ -1226,7 +1217,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case TRACKING_PAGE_DELTA:
-                TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record;
+                TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)rec;
 
                 buf.putInt(tpDelta.groupId());
                 buf.putLong(tpDelta.pageId());
@@ -1238,7 +1229,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
-                MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record;
+                MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)rec;
 
                 buf.putInt(mpUpdateNextSnapshotId.groupId());
                 buf.putLong(mpUpdateNextSnapshotId.pageId());
@@ -1249,7 +1240,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
             case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
                 MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId =
-                        (MetaPageUpdateLastSuccessfulFullSnapshotId)record;
+                        (MetaPageUpdateLastSuccessfulFullSnapshotId)rec;
 
                 buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId());
                 buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId());
@@ -1260,7 +1251,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
             case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
                 MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId =
-                        (MetaPageUpdateLastSuccessfulSnapshotId)record;
+                        (MetaPageUpdateLastSuccessfulSnapshotId)rec;
 
                 buf.putInt(mpUpdateLastSuccSnapshotId.groupId());
                 buf.putLong(mpUpdateLastSuccSnapshotId.pageId());
@@ -1272,7 +1263,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
             case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
                 MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx =
-                        (MetaPageUpdateLastAllocatedIndex) record;
+                        (MetaPageUpdateLastAllocatedIndex) rec;
 
                 buf.putInt(mpUpdateLastAllocatedIdx.groupId());
                 buf.putLong(mpUpdateLastAllocatedIdx.pageId());
@@ -1282,7 +1273,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PART_META_UPDATE_STATE:
-                PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record;
+                PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) rec;
 
                 buf.putInt(partMetaStateRecord.groupId());
 
@@ -1295,7 +1286,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case PAGE_LIST_META_RESET_COUNT_RECORD:
-                PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record;
+                PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) rec;
 
                 buf.putInt(pageListMetaResetCntRecord.groupId());
                 buf.putLong(pageListMetaResetCntRecord.pageId());
@@ -1303,7 +1294,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case TX_RECORD:
-                txRecordSerializer.writeTxRecord((TxRecord)record, buf);
+                txRecordSerializer.writeTxRecord((TxRecord)rec, buf);
 
                 break;
 
@@ -1311,7 +1302,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             default:
-                throw new UnsupportedOperationException("Type: " + record.type());
+                throw new UnsupportedOperationException("Type: " + rec.type());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 16a81a4..0e574f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -33,6 +33,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.record.Header
  * Record data V2 serializer.
  */
 public class RecordDataV2Serializer implements RecordDataSerializer {
+    /** Length of HEADER record data. */
+    static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
+
     /** V1 data serializer delegate. */
     private final RecordDataV1Serializer delegateSerializer;
 
@@ -47,10 +50,10 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
 
     /** {@inheritDoc} */
     @Override public int size(WALRecord record) throws IgniteCheckedException {
-        if (record instanceof HeaderRecord)
-            throw new UnsupportedOperationException("Getting size of header records is forbidden since version 2 of serializer");
-
         switch (record.type()) {
+            case HEADER_RECORD:
+                return HEADER_RECORD_DATA_SIZE;
+
             case DATA_RECORD:
                 return delegateSerializer.size(record) + 8/*timestamp*/;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index d460705..11bd16b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -68,7 +68,7 @@ public class RecordV1Serializer implements RecordSerializer {
     public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE + RecordDataV1Serializer.HEADER_RECORD_DATA_SIZE;
 
     /** Skip CRC calculation/check flag */
-    public static boolean SKIP_CRC = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
+    public static boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
 
     /** V1 data serializer. */
     private final RecordDataV1Serializer dataSerializer;
@@ -152,15 +152,15 @@ public class RecordV1Serializer implements RecordSerializer {
         }
 
         /** {@inheritDoc} */
-        @Override public void writeWithHeaders(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+        @Override public void writeWithHeaders(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
             // Write record type.
-            putRecordType(buf, record);
+            putRecordType(buf, rec);
 
             // Write record file position.
-            putPositionOfRecord(buf, record);
+            putPositionOfRecord(buf, rec);
 
             // Write record data.
-            dataSerializer.writeRecord(record, buf);
+            dataSerializer.writeRecord(rec, buf);
         }
     };
 
@@ -193,8 +193,8 @@ public class RecordV1Serializer implements RecordSerializer {
 
     /** {@inheritDoc} */
     @SuppressWarnings("CastConflictsWithInstanceof")
-    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
-        writeWithCrc(record, buf, recordIO);
+    @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
+        writeWithCrc(rec, buf, recordIO);
     }
 
     /** {@inheritDoc} */
@@ -225,29 +225,29 @@ public class RecordV1Serializer implements RecordSerializer {
      */
     public static FileWALPointer readPosition(DataInput in) throws IOException {
         long idx = in.readLong();
-        int fileOffset = in.readInt();
+        int fileOff = in.readInt();
 
-        return new FileWALPointer(idx, fileOffset, 0);
+        return new FileWALPointer(idx, fileOff, 0);
     }
 
     /**
      * Writes record file position to given {@code buf}.
      *
      * @param buf Buffer to write record file position.
-     * @param record WAL record.
+     * @param rec WAL record.
      */
-    public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) {
-        putPosition(buf, (FileWALPointer) record.position());
+    private static void putPositionOfRecord(ByteBuffer buf, WALRecord rec) {
+        putPosition(buf, (FileWALPointer)rec.position());
     }
 
     /**
      * Writes record type to given {@code buf}.
      *
      * @param buf Buffer to write record type.
-     * @param record WAL record.
+     * @param rec WAL record.
      */
-    public static void putRecordType(ByteBuffer buf, WALRecord record) {
-        buf.put((byte)(record.type().ordinal() + 1));
+    static void putRecordType(ByteBuffer buf, WALRecord rec) {
+        buf.put((byte)(rec.type().ordinal() + 1));
     }
 
     /**
@@ -258,7 +258,7 @@ public class RecordV1Serializer implements RecordSerializer {
      * @throws IgniteCheckedException If logical end of segment is reached.
      * @throws IOException In case of I/O problems.
      */
-    public static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
+    static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
         int type = in.readUnsignedByte();
 
         if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
@@ -282,10 +282,10 @@ public class RecordV1Serializer implements RecordSerializer {
      * @throws EOFException In case of end of file.
      * @throws IgniteCheckedException If it's unable to read record.
      */
-    public static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException {
+    static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException {
         long startPos = -1;
 
-        try (FileInput.Crc32CheckingFileInput in = in0.startRead(SKIP_CRC)) {
+        try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
             startPos = in0.position();
 
             WALRecord res = reader.readWithHeaders(in, expPtr);
@@ -307,19 +307,19 @@ public class RecordV1Serializer implements RecordSerializer {
     /**
      * Writes record with calculated CRC to buffer {@code buf}.
      *
-     * @param record WAL record.
+     * @param rec WAL record.
      * @param buf Buffer to write.
      * @param writer Record write I/O interface.
      * @throws IgniteCheckedException If it's unable to write record.
      */
-    public static void writeWithCrc(WALRecord record, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
-        assert record.size() >= 0 && buf.remaining() >= record.size() : record.size();
+    static void writeWithCrc(WALRecord rec, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
+        assert rec.size() >= 0 && buf.remaining() >= rec.size() : rec.size();
 
         int startPos = buf.position();
 
-        writer.writeWithHeaders(record, buf);
+        writer.writeWithHeaders(rec, buf);
 
-        if (!SKIP_CRC) {
+        if (!skipCrc) {
             int curPos = buf.position();
 
             buf.position(startPos);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 05f2a24..adfd701 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -54,7 +54,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serial
  */
 public class RecordV2Serializer implements RecordSerializer {
     /** Length of WAL Pointer: Index (8) + File offset (4) + Record length (4) */
-    public static final int FILE_WAL_POINTER_SIZE = 8 + 4 + 4;
+    private static final int FILE_WAL_POINTER_SIZE = 8 + 4 + 4;
 
     /** V2 data serializer. */
     private final RecordDataV2Serializer dataSerializer;
@@ -212,37 +212,38 @@ public class RecordV2Serializer implements RecordSerializer {
      * @return Read file WAL pointer.
      * @throws IOException If failed to write.
      */
-    public static FileWALPointer readPositionAndCheckPoint(
+    @SuppressWarnings("UnusedReturnValue")
+    private static FileWALPointer readPositionAndCheckPoint(
         DataInput in,
         WALPointer expPtr,
         boolean skipPositionCheck
     ) throws IgniteCheckedException, IOException {
         long idx = in.readLong();
-        int fileOffset = in.readInt();
-        int length = in.readInt();
+        int fileOff = in.readInt();
+        int len = in.readInt();
 
         FileWALPointer p = (FileWALPointer)expPtr;
 
-        if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOffset, p.fileOffset())))
+        if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOff, p.fileOffset())))
             throw new WalSegmentTailReachedException(
                 "WAL segment tail is reached. [ " +
                         "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " +
-                        "Actual state : {Index=" + idx + ",Offset=" + fileOffset + "} ]", null);
+                        "Actual state : {Index=" + idx + ",Offset=" + fileOff + "} ]", null);
 
-        return new FileWALPointer(idx, fileOffset, length);
+        return new FileWALPointer(idx, fileOff, len);
     }
 
     /**
-     * Writes record file position to given {@code buf}.
+     * Writes rec file position to given {@code buf}.
      *
-     * @param buf Buffer to write record file position.
-     * @param record WAL record.
+     * @param buf Buffer to write rec file position.
+     * @param rec WAL rec.
      */
-    public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) {
-        FileWALPointer p = (FileWALPointer)record.position();
+    private static void putPositionOfRecord(ByteBuffer buf, WALRecord rec) {
+        FileWALPointer p = (FileWALPointer)rec.position();
 
         buf.putLong(p.index());
         buf.putInt(p.fileOffset());
-        buf.putInt(record.size());
+        buf.putInt(rec.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 49d8c18..6a265eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -259,9 +259,8 @@ public class PlatformConfigurationUtils {
                 }
             }
 
-            if (ccfg.getPluginConfigurations() != null) {
+            if (ccfg.getPluginConfigurations() != null)
                 Collections.addAll(plugins, ccfg.getPluginConfigurations());
-            }
 
             ccfg.setPluginConfigurations(plugins.toArray(new CachePluginConfiguration[plugins.size()]));
         }
@@ -420,7 +419,8 @@ public class PlatformConfigurationUtils {
             out.writeBoolean(f0.isExcludeNeighbors());
             out.writeByte((byte) 0);  // override flags
             out.writeObject(null);  // user func
-        } else if (f instanceof PlatformAffinityFunction) {
+        }
+        else if (f instanceof PlatformAffinityFunction) {
             PlatformAffinityFunction f0 = (PlatformAffinityFunction) f;
             AffinityFunction baseFunc = f0.getBaseFunc();
 
@@ -430,16 +430,17 @@ public class PlatformConfigurationUtils {
                 out.writeBoolean(((RendezvousAffinityFunction) baseFunc).isExcludeNeighbors());
                 out.writeByte(f0.getOverrideFlags());
                 out.writeObject(f0.getUserFunc());
-            } else {
+            }
+            else {
                 out.writeByte((byte) 3);
                 out.writeInt(f0.partitions());
                 out.writeBoolean(false);  // exclude neighbors
                 out.writeByte(f0.getOverrideFlags());
                 out.writeObject(f0.getUserFunc());
             }
-        } else {
-            out.writeByte((byte) 0);
         }
+        else
+            out.writeByte((byte)0);
     }
 
     /**
@@ -465,9 +466,8 @@ public class PlatformConfigurationUtils {
             out.writeInt(p0.getMaxSize());
             out.writeLong(p0.getMaxMemorySize());
         }
-        else {
+        else
             out.writeByte((byte)0);
-        }
     }
 
     /**
@@ -578,9 +578,9 @@ public class PlatformConfigurationUtils {
     public static void readIgniteConfiguration(BinaryRawReaderEx in, IgniteConfiguration cfg) {
         if (in.readBoolean())
             cfg.setClientMode(in.readBoolean());
-        int[] eventTypes = in.readIntArray();
-        if (eventTypes != null)
-            cfg.setIncludeEventTypes(eventTypes);
+        int[] evtTypes = in.readIntArray();
+        if (evtTypes != null)
+            cfg.setIncludeEventTypes(evtTypes);
         if (in.readBoolean())
             cfg.setMetricsExpireTime(in.readLong());
         if (in.readBoolean())
@@ -598,9 +598,9 @@ public class PlatformConfigurationUtils {
         String workDir = in.readString();
         if (workDir != null)
             cfg.setWorkDirectory(workDir);
-        String localHost = in.readString();
-        if (localHost != null)
-            cfg.setLocalHost(localHost);
+        String locHost = in.readString();
+        if (locHost != null)
+            cfg.setLocalHost(locHost);
         if (in.readBoolean())
             cfg.setDaemon(in.readBoolean());
         if (in.readBoolean())
@@ -787,9 +787,9 @@ public class PlatformConfigurationUtils {
      * @param in Reader.
      */
     private static void readDiscoveryConfiguration(BinaryRawReader in, IgniteConfiguration cfg) {
-        boolean hasConfig = in.readBoolean();
+        boolean hasCfg = in.readBoolean();
 
-        if (!hasConfig)
+        if (!hasCfg)
             return;
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -799,21 +799,20 @@ public class PlatformConfigurationUtils {
         if (hasIpFinder) {
             byte ipFinderType = in.readByte();
 
-            int addrCount = in.readInt();
+            int addrCnt = in.readInt();
 
             ArrayList<String> addrs = null;
 
-            if (addrCount > 0) {
-                addrs = new ArrayList<>(addrCount);
+            if (addrCnt > 0) {
+                addrs = new ArrayList<>(addrCnt);
 
-                for (int i = 0; i < addrCount; i++)
+                for (int i = 0; i < addrCnt; i++)
                     addrs.add(in.readString());
             }
 
             TcpDiscoveryVmIpFinder finder = null;
-            if (ipFinderType == 1) {
+            if (ipFinderType == 1)
                 finder = new TcpDiscoveryVmIpFinder();
-            }
             else if (ipFinderType == 2) {
                 TcpDiscoveryMulticastIpFinder finder0 = new TcpDiscoveryMulticastIpFinder();
 
@@ -830,9 +829,8 @@ public class PlatformConfigurationUtils {
 
                 finder = finder0;
             }
-            else {
+            else
                 assert false;
-            }
 
             finder.setAddresses(addrs);
 
@@ -981,23 +979,23 @@ public class PlatformConfigurationUtils {
      * Write query entity.
      *
      * @param writer Writer.
-     * @param queryEntity Query entity.
+     * @param qryEntity Query entity.
      */
-    public static void writeQueryEntity(BinaryRawWriter writer, QueryEntity queryEntity) {
-        assert queryEntity != null;
+    public static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity) {
+        assert qryEntity != null;
 
-        writer.writeString(queryEntity.getKeyType());
-        writer.writeString(queryEntity.getValueType());
-        writer.writeString(queryEntity.getTableName());
-        writer.writeString(queryEntity.getKeyFieldName());
-        writer.writeString(queryEntity.getValueFieldName());
+        writer.writeString(qryEntity.getKeyType());
+        writer.writeString(qryEntity.getValueType());
+        writer.writeString(qryEntity.getTableName());
+        writer.writeString(qryEntity.getKeyFieldName());
+        writer.writeString(qryEntity.getValueFieldName());
 
         // Fields
-        LinkedHashMap<String, String> fields = queryEntity.getFields();
+        LinkedHashMap<String, String> fields = qryEntity.getFields();
 
         if (fields != null) {
-            Set<String> keyFields = queryEntity.getKeyFields();
-            Set<String> notNullFields = queryEntity.getNotNullFields();
+            Set<String> keyFields = qryEntity.getKeyFields();
+            Set<String> notNullFields = qryEntity.getNotNullFields();
 
             writer.writeInt(fields.size());
 
@@ -1012,7 +1010,7 @@ public class PlatformConfigurationUtils {
             writer.writeInt(0);
 
         // Aliases
-        Map<String, String> aliases = queryEntity.getAliases();
+        Map<String, String> aliases = qryEntity.getAliases();
 
         if (aliases != null) {
             writer.writeInt(aliases.size());
@@ -1026,7 +1024,7 @@ public class PlatformConfigurationUtils {
             writer.writeInt(0);
 
         // Indexes
-        Collection<QueryIndex> indexes = queryEntity.getIndexes();
+        Collection<QueryIndex> indexes = qryEntity.getIndexes();
 
         if (indexes != null) {
             writer.writeInt(indexes.size());
@@ -1042,16 +1040,16 @@ public class PlatformConfigurationUtils {
      * Writer query index.
      *
      * @param writer Writer.
-     * @param index Index.
+     * @param idx Index.
      */
-    private static void writeQueryIndex(BinaryRawWriter writer, QueryIndex index) {
-        assert index != null;
+    private static void writeQueryIndex(BinaryRawWriter writer, QueryIndex idx) {
+        assert idx != null;
 
-        writer.writeString(index.getName());
-        writeEnumByte(writer, index.getIndexType());
-        writer.writeInt(index.getInlineSize());
+        writer.writeString(idx.getName());
+        writeEnumByte(writer, idx.getIndexType());
+        writer.writeInt(idx.getInlineSize());
 
-        LinkedHashMap<String, Boolean> fields = index.getFields();
+        LinkedHashMap<String, Boolean> fields = idx.getFields();
 
         if (fields != null) {
             writer.writeInt(fields.size());
@@ -1273,9 +1271,9 @@ public class PlatformConfigurationUtils {
         if (finder instanceof TcpDiscoveryVmIpFinder) {
             w.writeBoolean(true);
 
-            boolean isMulticast = finder instanceof TcpDiscoveryMulticastIpFinder;
+            boolean isMcast = finder instanceof TcpDiscoveryMulticastIpFinder;
 
-            w.writeByte((byte)(isMulticast ? 2 : 1));
+            w.writeByte((byte)(isMcast ? 2 : 1));
 
             Collection<InetSocketAddress> addrs = finder.getRegisteredAddresses();
 
@@ -1284,7 +1282,7 @@ public class PlatformConfigurationUtils {
             for (InetSocketAddress a : addrs)
                 w.writeString(a.toString());
 
-            if (isMulticast) {
+            if (isMcast) {
                 TcpDiscoveryMulticastIpFinder multiFinder = (TcpDiscoveryMulticastIpFinder) finder;
 
                 w.writeString(multiFinder.getLocalAddress());
@@ -1529,9 +1527,8 @@ public class PlatformConfigurationUtils {
                 w.writeLong(plc.getRateTimeInterval());
             }
         }
-        else {
+        else
             w.writeInt(0);
-        }
     }
 
     /**
@@ -1573,9 +1570,8 @@ public class PlatformConfigurationUtils {
             w.writeBoolean(cfg.isTcpNoDelay());
             w.writeInt(cfg.getMaxOpenCursorsPerConnection());
             w.writeInt(cfg.getThreadPoolSize());
-        } else {
+        } else
             w.writeBoolean(false);
-        }
     }
 
     /**
@@ -1640,7 +1636,7 @@ public class PlatformConfigurationUtils {
                 .setWalStorePath(in.readString())
                 .setWalArchivePath(in.readString())
                 .setWalMode(WALMode.fromOrdinal(in.readInt()))
-                .setTlbSize(in.readInt())
+                .setWalBufferSize(in.readInt())
                 .setWalFlushFrequency((int) in.readLong())
                 .setWalFsyncDelayNanos(in.readLong())
                 .setWalRecordIteratorBufferSize(in.readInt())
@@ -1729,7 +1725,7 @@ public class PlatformConfigurationUtils {
             w.writeString(cfg.getWalStorePath());
             w.writeString(cfg.getWalArchivePath());
             w.writeInt(cfg.getWalMode().ordinal());
-            w.writeInt(cfg.getTlbSize());
+            w.writeInt(cfg.getWalBufferSize());
             w.writeLong(cfg.getWalFlushFrequency());
             w.writeLong(cfg.getWalFsyncDelayNanos());
             w.writeInt(cfg.getWalRecordIteratorBufferSize());
@@ -1740,9 +1736,8 @@ public class PlatformConfigurationUtils {
             w.writeInt(cfg.getCheckpointWriteOrder().ordinal());
             w.writeBoolean(cfg.isWriteThrottlingEnabled());
 
-        } else {
+        } else
             w.writeBoolean(false);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 15e6f2c..98b5c08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -1320,6 +1320,15 @@ public abstract class GridUnsafe {
     }
 
     /**
+     * Returns page size.
+     *
+     * @return Page size.
+     */
+    public static int pageSize() {
+        return UNSAFE.pageSize();
+    }
+
+    /**
      * Returns unaligned flag.
      */
     private static boolean unaligned() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 9804608..b3ca6ff 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9449,11 +9449,9 @@ public abstract class IgniteUtils {
         try {
             Method mtd = cls.getDeclaredMethod(name, paramTypes);
 
-            if (mtd.getReturnType() != void.class) {
-                mtd.setAccessible(true);
+            mtd.setAccessible(true);
 
-                return mtd;
-            }
+            return mtd;
         }
         catch (NoSuchMethodException ignored) {
             // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
index d26ab35..986f0ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
@@ -111,7 +111,7 @@ public class VisorPersistentStoreConfiguration extends VisorDataTransferObject {
         walArchivePath = cfg.getWalArchivePath();
         metricsEnabled = cfg.isMetricsEnabled();
         walMode = cfg.getWalMode();
-        tlbSize = cfg.getWalThreadLocalBufferSize();
+        tlbSize = cfg.getWalBufferSize();
         walFlushFreq = cfg.getWalFlushFrequency();
         walFsyncDelay = cfg.getWalFsyncDelayNanos();
         walRecordIterBuffSize = cfg.getWalRecordIteratorBufferSize();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
index 40410cb..2051da3 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
@@ -41,6 +41,10 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics {
     @Override float getWalFsyncTimeAverage();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("WAL buffer poll spins number over the last time interval.")
+    @Override long getWalBuffPollSpinsRate();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Duration of the last checkpoint in milliseconds.")
     @Override long getLastCheckpointDuration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
index 6245952..15e67b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
@@ -214,4 +214,8 @@ public class IgniteClusterActivateDeactivateTestWithPersistence extends IgniteCl
 
         checkNoCaches(SRVS);
     }
+
+    @Override public void testActivateFailover3() throws Exception {
+        super.testActivateFailover3();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
index f3aee08..22d1665 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
@@ -204,7 +204,7 @@ public class IgnitePdsTransactionsHangTest extends GridCommonAbstractTest {
                             }
                         }
                         catch (Throwable e) {
-                            System.out.println(e.toString());
+                            log.error("Unexpected exception:", e);
 
                             throw new RuntimeException(e);
                         }
@@ -229,14 +229,14 @@ public class IgnitePdsTransactionsHangTest extends GridCommonAbstractTest {
                     max = Math.max(max, sum);
                     min = Math.min(min, sum);
 
-                    System.out.println("Operation count: " + sum + " min=" + min + " max=" + max + " avg=" + totalOperations / (periods - WARM_UP_PERIOD));
+                    log.info("Operation count: " + sum + " min=" + min + " max=" + max + " avg=" + totalOperations / (periods - WARM_UP_PERIOD));
                 }
             }
 
             interrupt.set(true);
 
             threadPool.shutdown();
-            System.out.println("Test complete");
+            log.info("Test complete");
 
             threadPool.awaitTermination(getTestTimeout(), TimeUnit.MILLISECONDS);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
deleted file mode 100644
index 3b76b63..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.db.wal;
-
-import javax.cache.CacheException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
-
-/**
- *
- */
-public class IgnitePdsWalTlbTest extends GridCommonAbstractTest {
-    /** Ip finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Cache name. */
-    private static final String CACHE_NAME = "cache";
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(CACHE_NAME);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024)
-                    .setPersistenceEnabled(true)
-                    .setCheckpointPageBufferSize(DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE + 1))
-            .setWalMode(WALMode.LOG_ONLY)
-            .setWalThreadLocalBufferSize(640000000);
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        if (gridName.endsWith("1"))
-            cfg.setClientMode(true);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 30_000;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
-
-        stopAllGrids();
-
-        startGrids(2);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testWalDirectOutOfMemory() throws Exception {
-        IgniteEx ig = grid(1);
-
-        ig.active(true);
-
-        boolean locked = true;
-
-        try (IgniteDataStreamer<Integer, Integer> streamer = ig.dataStreamer(CACHE_NAME)) {
-            for (int i = 0; i < 100_000; i++) {
-                streamer.addData(i, 1);
-
-                if (i > 0 && i % 10_000 == 0)
-                    info("Done put: " + i);
-            }
-        }
-        catch (CacheException ignore) {
-            // expected
-            locked = false;
-        }
-        finally {
-            assertFalse(locked);
-
-            stopAllGrids();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 107b467..2bcc22f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.file.OpenOption;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -31,6 +32,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.GridKernalState;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -65,6 +67,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
         deleteWorkFiles();
     }
 
@@ -83,12 +87,12 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
         cfg.setCacheConfiguration(cacheCfg);
 
         DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
-            .setFileIOFactory(new FailingFileIOFactory())
-            .setWalMode(WALMode.BACKGROUND)
-            // Setting WAL Segment size to high values forces flushing by timeout.
-            .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
+                .setDefaultDataRegionConfiguration(
+                 new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
+                .setFileIOFactory(new FailingFileIOFactory())
+                .setWalMode(WALMode.BACKGROUND)
+                .setWalBufferSize(128 * 1024)// Setting WAL Segment size to high values forces flushing by timeout.
+                .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
 
         cfg.setDataStorageConfiguration(memCfg);
 
@@ -121,6 +125,13 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
     private void flushingErrorTest() throws Exception {
         final IgniteEx grid = startGrid(0);
 
+        IgniteWriteAheadLogManager wal = grid.context().cache().context().wal();
+
+        boolean mmap = GridTestUtils.getFieldValue(wal, "mmap");
+
+        if (mmap)
+            return;
+
         try {
             grid.active(true);
 
@@ -152,7 +163,7 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     private void deleteWorkFiles() throws IgniteCheckedException {
         deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
@@ -162,9 +173,10 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
      * Create File I/O which fails after second attempt to write to File
      */
     private static class FailingFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
-        /** */
+        /** Delegate factory. */
         private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
 
         /** {@inheritDoc} */
@@ -174,17 +186,23 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public FileIO create(File file, OpenOption... modes) throws IOException {
-            FileIO delegate = delegateFactory.create(file, modes);
+            final FileIO delegate = delegateFactory.create(file, modes);
 
             return new FileIODecorator(delegate) {
                 int writeAttempts = 2;
 
-                @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+                @Override public int write(ByteBuffer srcBuf) throws IOException {
                     if (--writeAttempts == 0)
                         throw new RuntimeException("Test exception. Unable to write to file.");
 
-                    return super.write(sourceBuffer);
+                    return super.write(srcBuf);
                 }
+
+                /** {@inheritDoc} */
+                @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+                    return delegate.map(maxWalSegmentSize);
+                }
+
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 057e082..d6c0a9d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import java.nio.MappedByteBuffer;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -30,6 +31,8 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -139,6 +142,13 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
 
         final Ignite grid = startGridsMultiThreaded(gridCount());
 
+        IgniteWriteAheadLogManager wal = ((IgniteKernal)grid).context().cache().context().wal();
+
+        boolean mmap = GridTestUtils.getFieldValue(wal, "mmap");
+
+        if (mmap)
+            return;
+
         grid.active(true);
 
         IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
@@ -177,8 +187,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
 
         // We should await successful stop of node.
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override
-            public boolean apply() {
+            @Override public boolean apply() {
                 return grid.cluster().nodes().size() == gridCount();
             }
         }, getTestTimeout());
@@ -228,17 +237,21 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
 
         /** {@inheritDoc} */
         @Override public FileIO create(File file, OpenOption... modes) throws IOException {
-            FileIO delegate = delegateFactory.create(file, modes);
+            final FileIO delegate = delegateFactory.create(file, modes);
 
             return new FileIODecorator(delegate) {
                 int writeAttempts = 2;
 
-                @Override public int write(ByteBuffer sourceBuffer) throws IOException {
-
+                @Override public int write(ByteBuffer srcBuf) throws IOException {
                     if (--writeAttempts == 0 && fail!= null && fail.get())
                         throw new IOException("No space left on device");
 
-                    return super.write(sourceBuffer);
+                    return super.write(srcBuf);
+                }
+
+                /** {@inheritDoc} */
+                @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+                    return delegate.map(maxWalSegmentSize);
                 }
             };
         }