You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/05/30 09:09:16 UTC

ignite git commit: IGNITE-5322 - WAL iterator improvements

Repository: ignite
Updated Branches:
  refs/heads/ignite-5322 [created] 3cd2cec57


IGNITE-5322 - WAL iterator improvements


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cd2cec5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cd2cec5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cd2cec5

Branch: refs/heads/ignite-5322
Commit: 3cd2cec57cfbdaee8551939aec05b7e173926f93
Parents: c6313b7
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue May 30 12:09:01 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue May 30 12:09:01 2017 +0300

----------------------------------------------------------------------
 .../wal/record/StoreOperationRecord.java        | 118 --------------
 .../internal/pagemem/wal/record/WALRecord.java  |  12 +-
 .../GridCacheDatabaseSharedManager.java         |   6 +-
 .../cache/database/wal/FileWALPointer.java      |  17 +-
 .../database/wal/FileWriteAheadLogManager.java  | 130 ++++++++-------
 .../cache/database/wal/RecordSerializer.java    |   3 +-
 .../wal/serializer/RecordV1Serializer.java      | 159 ++++++++++---------
 .../db/file/IgniteWalRecoverySelfTest.java      |   2 +-
 8 files changed, 182 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java
deleted file mode 100644
index a82f604..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagemem.wal.record;
-
-/**
- *
- */
-public class StoreOperationRecord extends WALRecord {
-    /**
-     * Store operation type.
-     */
-    public enum StoreOperationType {
-        /** */
-        ENTRY_CREATE,
-
-        /** */
-        INDEX_PUT,
-
-        /** */
-        INDEX_REMOVE;
-
-        /** */
-        private static final StoreOperationType[] VALS = StoreOperationType.values();
-
-        /** */
-        public static StoreOperationType fromOrdinal(int ord) {
-            return ord < 0 || ord >= VALS.length ? null : VALS[ord];
-        }
-    }
-
-    /** */
-    private StoreOperationType opType;
-
-    /** */
-    private int cacheId;
-
-    /** */
-    private long link;
-
-    /** */
-    private int idxId;
-
-    /** {@inheritDoc} */
-    @Override public RecordType type() {
-        return RecordType.STORE_OPERATION_RECORD;
-    }
-
-    /**
-     * @return Cache ID.
-     */
-    public int cacheId() {
-        return cacheId;
-    }
-
-    /**
-     * @return Link to data.
-     */
-    public long link() {
-        return link;
-    }
-
-    /**
-     * @return Index ID.
-     */
-    public int indexId() {
-        return idxId;
-    }
-
-    /**
-     * @return Operation type.
-     */
-    public StoreOperationType operationType() {
-        return opType;
-    }
-
-    /**
-     * @param opType Operation type.
-     */
-    public void operationType(StoreOperationType opType) {
-        this.opType = opType;
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     */
-    public void cacheId(int cacheId) {
-        this.cacheId = cacheId;
-    }
-
-    /**
-     * @param link Link.
-     */
-    public void link(long link) {
-        this.link = link;
-    }
-
-    /**
-     * @param idxId Index ID.
-     */
-    public void indexId(int idxId) {
-        this.idxId = idxId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 142f0ee..b76bcc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -38,9 +39,6 @@ public abstract class WALRecord {
         DATA_RECORD,
 
         /** */
-        STORE_OPERATION_RECORD,
-
-        /** */
         CHECKPOINT_RECORD,
 
         /** */
@@ -186,7 +184,7 @@ public abstract class WALRecord {
     private WALRecord prev;
 
     /** */
-    private long pos;
+    private WALPointer pos;
 
     /**
      * @param chainSize Chain size in bytes.
@@ -219,15 +217,15 @@ public abstract class WALRecord {
     /**
      * @return Position in file.
      */
-    public long position() {
+    public WALPointer position() {
         return pos;
     }
 
     /**
      * @param pos Position in file.
      */
-    public void position(long pos) {
-        assert pos >= 0: pos;
+    public void position(WALPointer pos) {
+        assert pos != null;
 
         this.pos = pos;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
index a78ba27..c57f9cb 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
@@ -1232,7 +1232,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
         }
 
-        ByteBuffer buf = ByteBuffer.allocate(16);
+        ByteBuffer buf = ByteBuffer.allocate(20);
         buf.order(ByteOrder.nativeOrder());
 
         if (startFile != null)
@@ -1260,7 +1260,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             buf.flip();
 
-            return new FileWALPointer(buf.getInt(), buf.getInt(), buf.getInt());
+            return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
         }
         catch (IOException e) {
             throw new IgniteCheckedException("Failed to read checkpoint pointer from marker file: " +
@@ -1728,7 +1728,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             tmpWriteBuf.rewind();
 
-            tmpWriteBuf.putInt(filePtr.index());
+            tmpWriteBuf.putLong(filePtr.index());
 
             tmpWriteBuf.putInt(filePtr.fileOffset());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
index 1102054..36df2e7 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  */
 public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
     /** */
-    private final int idx;
+    private final long idx;
 
     /** */
     private final int fileOffset;
@@ -37,7 +37,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
      * @param idx File timestamp index.
      * @param fileOffset Offset in file, from the beginning.
      */
-    public FileWALPointer(int idx, int fileOffset, int len) {
+    public FileWALPointer(long idx, int fileOffset, int len) {
         this.idx = idx;
         this.fileOffset = fileOffset;
         this.len = len;
@@ -46,7 +46,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
     /**
      * @return Timestamp index.
      */
-    public int index() {
+    public long index() {
         return idx;
     }
 
@@ -64,6 +64,13 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
         return len;
     }
 
+    /**
+     * @param len Record length.
+     */
+    public void length(int len) {
+        this.len = len;
+    }
+
     /** {@inheritDoc} */
     @Override public WALPointer next() {
         if (len == 0)
@@ -89,7 +96,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        int result = idx;
+        int result = (int)(idx ^ (idx >>> 32));
 
         result = 31 * result + fileOffset;
 
@@ -98,7 +105,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
 
     /** {@inheritDoc} */
     @Override public int compareTo(FileWALPointer o) {
-        int res = Integer.compare(idx, o.idx);
+        int res = Long.compare(idx, o.idx);
 
         return res == 0 ? Integer.compare(fileOffset, o.fileOffset) : res;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index f8b18ef..4b79308 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -192,7 +192,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         assert dbCfg != null : "WAL should not be created if persistence is disabled.";
 
         this.dbCfg = dbCfg;
-        this.igCfg = igCfg;
 
         maxWalSegmentSize = dbCfg.getWalSegmentSize();
 
@@ -443,7 +442,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         archiver0.release(((FileWALPointer)start).index());
     }
 
-    private boolean hasIndex(int absIdx) {
+    private boolean hasIndex(long absIdx) {
         String name = FileDescriptor.fileName(absIdx, serializer.version());
 
         boolean inArchive = new File(walArchiveDir, name).exists();
@@ -509,7 +508,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @param consId Local node consistent ID.
      * @param msg File description to print out on successful initialization.
      * @return Initialized directory.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed to initialize directory.
      */
     private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException {
         File dir;
@@ -568,11 +567,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @throws IgniteCheckedException If failed to initialize WAL write handle.
      */
     private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException {
-        int absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
+        long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
         archiver.currentWalIndex(absIdx);
 
-        int segNo = absIdx % dbCfg.getWalSegments();
+        long segNo = absIdx % dbCfg.getWalSegments();
 
         File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version()));
 
@@ -625,7 +624,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @throws StorageException If IO exception occurred.
      * @throws IgniteCheckedException If failed.
      */
-    private FileWriteHandle initNextWriteHandle(int curIdx) throws StorageException, IgniteCheckedException {
+    private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException {
         try {
             File nextFile = pollNextFile(curIdx);
 
@@ -756,11 +755,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @return File ready for use as new WAL segment.
      * @throws IgniteCheckedException If failed.
      */
-    private File pollNextFile(int curIdx) throws IgniteCheckedException {
+    private File pollNextFile(long curIdx) throws IgniteCheckedException {
         // Signal to archiver that we are done with the segment and it can be archived.
-        int absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
+        long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
 
-        int segmentIdx = absNextIdx % dbCfg.getWalSegments();
+        long segmentIdx = absNextIdx % dbCfg.getWalSegments();
 
         return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version()));
     }
@@ -827,22 +826,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>.
          * Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
          */
-        private int curAbsWalIdx = -1;
+        private long curAbsWalIdx = -1;
 
         /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
-        private int lastAbsArchivedIdx = -1;
+        private long lastAbsArchivedIdx = -1;
 
         /** current thread stopping advice */
         private volatile boolean stopped;
 
         /** */
-        private NavigableMap<Integer, Integer> reserved = new TreeMap<>();
+        private NavigableMap<Long, Integer> reserved = new TreeMap<>();
 
         /**
          * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may
          * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
          */
-        private Map<Integer, Integer> locked = new HashMap<>();
+        private Map<Long, Integer> locked = new HashMap<>();
 
         /**
          *
@@ -869,7 +868,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * @param curAbsWalIdx Current absolute WAL segment index.
          */
-        private void currentWalIndex(int curAbsWalIdx) {
+        private void currentWalIndex(long curAbsWalIdx) {
             synchronized (this) {
                 this.curAbsWalIdx = curAbsWalIdx;
 
@@ -880,7 +879,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * @param absIdx Index for reservation.
          */
-        private synchronized void reserve(int absIdx) {
+        private synchronized void reserve(long absIdx) {
             Integer cur = reserved.get(absIdx);
 
             if (cur == null)
@@ -893,14 +892,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param absIdx Index for reservation.
          * @return {@code True} if index is reserved.
          */
-        private synchronized boolean reserved(int absIdx) {
+        private synchronized boolean reserved(long absIdx) {
             return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null;
         }
 
         /**
          * @param absIdx Reserved index.
          */
-        private synchronized void release(int absIdx) {
+        private synchronized void release(long absIdx) {
             Integer cur = reserved.get(absIdx);
 
             assert cur != null && cur >= 1 : cur;
@@ -937,7 +936,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 }
 
                 while (!Thread.currentThread().isInterrupted() && !stopped) {
-                    int toArchive;
+                    long toArchive;
 
                     synchronized (this) {
                         assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx +
@@ -991,7 +990,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @return Next index (curIdx+1) when it is ready to be written.
          * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
          */
-        private int nextAbsoluteSegmentIndex(int curIdx) throws IgniteCheckedException {
+        private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException {
             try {
                 synchronized (this) {
                     if (cleanException != null)
@@ -1022,7 +1021,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @return {@code True} if can read, {@code false} if work segment
          */
         @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-        private boolean checkCanReadArchiveOrReserveWorkSegment(int absIdx) {
+        private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
             synchronized (this) {
                 if (lastAbsArchivedIdx >= absIdx)
                     return true;
@@ -1044,7 +1043,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param absIdx Segment absolute index.
          */
         @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-        private void releaseWorkSegment(int absIdx) {
+        private void releaseWorkSegment(long absIdx) {
             synchronized (this) {
                 Integer cur = locked.get(absIdx);
 
@@ -1070,8 +1069,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * @param absIdx Absolute index to archive.
          */
-        private File archiveSegment(int absIdx) throws IgniteCheckedException {
-            int segIdx = absIdx % dbCfg.getWalSegments();
+        private File archiveSegment(long absIdx) throws IgniteCheckedException {
+            long segIdx = absIdx % dbCfg.getWalSegments();
 
             File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version()));
 
@@ -1187,7 +1186,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         protected final File file;
 
         /** Absolute WAL segment file index */
-        protected final int idx;
+        protected final long idx;
 
         /** */
         protected final int ver;
@@ -1203,7 +1202,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param file File.
          * @param idx Absolute WAL segment file index.
          */
-        private FileDescriptor(File file, Integer idx) {
+        private FileDescriptor(File file, Long idx) {
             this.file = file;
 
             String fileName = file.getName();
@@ -1218,7 +1217,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length();
 
             if (idx == null)
-                this.idx = Integer.parseInt(fileName.substring(0, v));
+                this.idx = Long.parseLong(fileName.substring(0, v));
             else
                 this.idx = idx;
 
@@ -1280,7 +1279,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            return idx;
+            return (int)(idx ^ (idx >>> 32));
         }
     }
 
@@ -1295,7 +1294,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         protected FileChannel ch;
 
         /** */
-        protected final int idx;
+        protected final long idx;
 
         /** */
         protected String gridName;
@@ -1304,7 +1303,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param file File.
          * @param idx Index.
          */
-        private FileHandle(RandomAccessFile file, int idx, String gridName) {
+        private FileHandle(RandomAccessFile file, long idx, String gridName) {
             this.file = file;
             this.idx = idx;
             this.gridName = gridName;
@@ -1333,7 +1332,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         private ReadFileHandle(
             RandomAccessFile file,
-            int idx,
+            long idx,
             String gridName,
             RecordSerializer ser,
             FileInput in
@@ -1413,7 +1412,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         private FileWriteHandle(
             RandomAccessFile file,
-            int idx,
+            long idx,
             String gridName,
             long pos,
             long maxSegmentSize,
@@ -1428,7 +1427,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             this.maxSegmentSize = maxSegmentSize;
             this.serializer = serializer;
 
-            head.set(new FakeRecord(pos));
+            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0)));
             written = pos;
             lastFsyncPos = pos;
         }
@@ -1469,10 +1468,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 rec.chainSize(newChainSize);
                 rec.previous(h);
-                rec.position(nextPos);
+
+                FileWALPointer ptr = new FileWALPointer(idx, (int)nextPos, rec.size());
+
+                rec.position(ptr);
 
                 if (head.compareAndSet(h, rec))
-                    return new FileWALPointer(idx, (int)rec.position(), rec.size());
+                    return ptr;
             }
         }
 
@@ -1481,7 +1483,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @return Position for the next record.
          */
         private long nextPosition(WALRecord rec) {
-            return rec.position() + rec.size();
+            return recordOffset(rec) + rec.size();
         }
 
         /**
@@ -1501,7 +1503,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 expWritten = ptr.fileOffset();
             }
             else // We read head position before the flush because otherwise we can get wrong position.
-                expWritten = head.get().position();
+                expWritten = recordOffset(head.get());
 
             if (flush(ptr))
                 return;
@@ -1565,7 +1567,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @return Chain begin position.
          */
         private long chainBeginPosition(WALRecord h) {
-            return h.position() + h.size() - h.chainSize();
+            return recordOffset(h) + h.size() - h.chainSize();
         }
 
         /**
@@ -1583,7 +1585,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             // Fail-fast before CAS.
             checkEnvironment();
 
-            if (!head.compareAndSet(expHead, new FakeRecord(nextPosition(expHead))))
+            if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0))))
                 return false;
 
             // At this point we grabbed the piece of WAL chain.
@@ -1658,7 +1660,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             buf.rewind();
             buf.limit(limit);
 
-            return head.position();
+            return recordOffset(head);
         }
 
         /**
@@ -1951,6 +1953,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Gets WAL record offset relative to the WAL segment file beginning.
+     *
+     * @param rec WAL record.
+     * @return File offset.
+     */
+    private static int recordOffset(WALRecord rec) {
+        FileWALPointer ptr = (FileWALPointer)rec.position();
+
+        assert ptr != null;
+
+        return ptr.fileOffset();
+    }
+
+    /**
      * Fake record is zero-sized record, which is not stored into file.
      * Fake record is used for storing position in file {@link WALRecord#position()}.
      * Fake record is allowed to have no previous record.
@@ -1959,7 +1975,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * @param pos Position.
          */
-        FakeRecord(long pos) {
+        FakeRecord(FileWALPointer pos) {
             position(pos);
         }
 
@@ -2009,7 +2025,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         private IgniteBiTuple<WALPointer, WALRecord> curRec;
 
         /** */
-        private int curIdx = -1;
+        private long curIdx = -1;
 
         /** */
         private ReadFileHandle curHandle;
@@ -2115,7 +2131,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     }
 
                     if (curIdx == -1) {
-                        int lastArchived = descs[descs.length - 1].idx;
+                        long lastArchived = descs[descs.length - 1].idx;
 
                         if (lastArchived > start.index())
                             throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start);
@@ -2159,9 +2175,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @throws IgniteCheckedException If failed.
+         *
          */
-        private void advanceRecord() throws IgniteCheckedException {
+        private void advanceRecord() {
             try {
                 ReadFileHandle hnd = curHandle;
 
@@ -2170,15 +2186,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                     int pos = (int)hnd.in.position();
 
-                    WALRecord rec = ser.readRecord(hnd.in);
+                    FileWALPointer ptr = new FileWALPointer(hnd.idx, pos, 0);
 
-                    WALPointer ptr = new FileWALPointer(hnd.idx, pos, rec.size());
+                    WALRecord rec = ser.readRecord(hnd.in, ptr);
 
-                    curRec = new IgniteBiTuple<>(ptr, rec);
+                    ptr.length(rec.size());
+
+                    curRec = new IgniteBiTuple<WALPointer, WALRecord>(ptr, rec);
                 }
             }
             catch (IOException | IgniteCheckedException e) {
-                // TODO: verify that wrapped IntegrityException is acceptable in this case.
+                if (!(e instanceof SegmentEofException)) {
+                    if (log.isInfoEnabled())
+                        log.info("Stopping WAL iteration due to an exception: " + e.getMessage());
+                }
+
                 curRec = null;
             }
         }
@@ -2213,7 +2235,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     FileDescriptor.fileName(curIdx, serializer.version())));
             }
             else {
-                int workIdx = curIdx % dbCfg.getWalSegments();
+                long workIdx = curIdx % dbCfg.getWalSegments();
 
                 fd = new FileDescriptor(
                     new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())),
@@ -2257,9 +2279,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 try {
                     RecordSerializer ser = forVersion(cctx, desc.ver);
-                    FileInput in = new FileInput(rf.getChannel(), buf);
+                    FileChannel channel = rf.getChannel();
+                    FileInput in = new FileInput(channel, buf);
 
-                    WALRecord rec = ser.readRecord(in);
+                    WALRecord rec = ser.readRecord(in,
+                        new FileWALPointer(desc.idx, (int)channel.position(), 0));
 
                     if (rec == null)
                         return null;
@@ -2314,14 +2338,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          *      archived yet. In this case the corresponding work segment is reserved (will not be deleted until
          *      release).
          */
-        private boolean canReadArchiveOrReserveWork(int absIdx) {
+        private boolean canReadArchiveOrReserveWork(long absIdx) {
             return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx);
         }
 
         /**
          * @param absIdx Absolute index to release.
          */
-        private void releaseWorkSegment(int absIdx) {
+        private void releaseWorkSegment(long absIdx) {
             if (archiver != null)
                 archiver.releaseWorkSegment(absIdx);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
index e3a972a..c929789 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.database.wal;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 
 /**
@@ -47,5 +48,5 @@ public interface RecordSerializer {
      * @param in Data input to read data from.
      * @return Read entry.
      */
-    public WALRecord readRecord(FileInput in) throws IOException, IgniteCheckedException;
+    public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
index f67f617..442c08d 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -37,8 +38,6 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
-import org.apache.ignite.internal.pagemem.wal.record.StoreOperationRecord;
-import org.apache.ignite.internal.pagemem.wal.record.StoreOperationRecord.StoreOperationType;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
@@ -97,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.database.wal.record.HeaderRec
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.F;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
@@ -140,6 +140,8 @@ public class RecordV1Serializer implements RecordSerializer {
 
         buf.put((byte)(record.type().ordinal() + 1));
 
+        putPosition(buf, (FileWALPointer)record.position());
+
         switch (record.type()) {
             case PAGE_RECORD:
                 PageSnapshot snap = (PageSnapshot)record;
@@ -150,16 +152,6 @@ public class RecordV1Serializer implements RecordSerializer {
 
                 break;
 
-            case STORE_OPERATION_RECORD:
-                StoreOperationRecord storeRec = (StoreOperationRecord)record;
-
-                buf.put((byte)storeRec.operationType().ordinal());
-                buf.putInt(storeRec.cacheId());
-                buf.putLong(storeRec.link());
-                buf.putInt(storeRec.indexId());
-
-                break;
-
             case MEMORY_RECOVERY:
                 MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
 
@@ -217,7 +209,7 @@ public class RecordV1Serializer implements RecordSerializer {
                 buf.put(walPtr == null ? (byte)0 : 1);
 
                 if (walPtr != null) {
-                    buf.putInt(walPtr.index());
+                    buf.putLong(walPtr.index());
                     buf.putInt(walPtr.fileOffset());
                     buf.putInt(walPtr.length());
                 }
@@ -651,13 +643,13 @@ public class RecordV1Serializer implements RecordSerializer {
     }
 
     /** {@inheritDoc} */
-    @Override public WALRecord readRecord(FileInput in0) throws  IOException, IgniteCheckedException {
+    @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws  IOException, IgniteCheckedException {
         long startPos = -1;
 
         try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
             startPos = in0.position();
 
-            WALRecord res = readRecord(in);
+            WALRecord res = readRecord(in, expPtr);
 
             assert res != null;
 
@@ -676,12 +668,18 @@ public class RecordV1Serializer implements RecordSerializer {
     /**
      * @param in In.
      */
-    private WALRecord readRecord(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+    private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
         int type = in.readUnsignedByte();
 
         if (type == 0)
             throw new SegmentEofException("Reached logical end of the segment", null);
 
+        FileWALPointer ptr = readPosition(in);
+
+        if (!F.eq(ptr, expPtr))
+            throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
+                ", readPtr=" + ptr + ']', null);
+
         RecordType recType = RecordType.fromOrdinal(type - 1);
 
         if (recType == null)
@@ -702,18 +700,6 @@ public class RecordV1Serializer implements RecordSerializer {
 
                 break;
 
-            case STORE_OPERATION_RECORD:
-                StoreOperationRecord storeRec = new StoreOperationRecord();
-
-                storeRec.operationType(StoreOperationType.fromOrdinal(in.readByte() & 0xFF));
-                storeRec.cacheId(in.readInt());
-                storeRec.link(in.readLong());
-                storeRec.indexId(in.readInt());
-
-                res = storeRec;
-
-                break;
-
             case CHECKPOINT_RECORD:
                 long msb = in.readLong();
                 long lsb = in.readLong();
@@ -1217,16 +1203,15 @@ public class RecordV1Serializer implements RecordSerializer {
     /** {@inheritDoc} */
     @SuppressWarnings("CastConflictsWithInstanceof")
     @Override public int size(WALRecord record) throws IgniteCheckedException {
+        int commonFields = /* Type */1 + /* Pointer */12 + /*CRC*/4;
+
         switch (record.type()) {
             case PAGE_RECORD:
                 assert record instanceof PageSnapshot;
 
                 PageSnapshot pageRec = (PageSnapshot)record;
 
-                return pageRec.pageData().length + 12 + 1 + 4;
-
-            case STORE_OPERATION_RECORD:
-                return 18 + 4;
+                return commonFields + pageRec.pageData().length + 12;
 
             case CHECKPOINT_RECORD:
                 CheckpointRecord cpRec = (CheckpointRecord)record;
@@ -1238,147 +1223,146 @@ public class RecordV1Serializer implements RecordSerializer {
 
                 FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
 
-                return 19 + cacheStatesSize + (walPtr == null ? 0 : 12) + 4;
+                return commonFields + 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
 
             case META_PAGE_INIT:
-                return 1 + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2  + /*ioVer*/2 +  /*tree root*/8 + /*reuse root*/8 +  /*CRC*/4;
+                return commonFields + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2  + /*ioVer*/2 +  /*tree root*/8 + /*reuse root*/8;
 
             case PARTITION_META_PAGE_UPDATE_COUNTERS:
-                return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1
-                    + /*allocatedIdxCandidate*/ 4 + /*CRC*/4;
+                return commonFields + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1
+                    + /*allocatedIdxCandidate*/ 4;
 
             case MEMORY_RECOVERY:
-                return 1 + 8 + 4;
+                return commonFields + 8;
 
             case PARTITION_DESTROY:
-                return 1 + /*cacheId*/4 + /*partId*/4 + /*CRC*/4;
+                return commonFields + /*cacheId*/4 + /*partId*/4;
 
             case DATA_RECORD:
                 DataRecord dataRec = (DataRecord)record;
 
-                return 5 + dataSize(dataRec) + 4;
+                return commonFields + 4 + dataSize(dataRec);
 
             case HEADER_RECORD:
-                return 13 + 4;
+                return commonFields + 12;
 
             case DATA_PAGE_INSERT_RECORD:
                 DataPageInsertRecord diRec = (DataPageInsertRecord)record;
 
-                return 1 + 4 + 8 + 2 +
-                    diRec.payload().length + 4;
+                return commonFields + 4 + 8 + 2 + diRec.payload().length;
 
             case DATA_PAGE_UPDATE_RECORD:
                 DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
 
-                return 1 + 4 + 8 + 2 + 4 +
-                    uRec.payload().length + 4;
+                return commonFields + 4 + 8 + 2 + 4 +
+                    uRec.payload().length;
 
             case DATA_PAGE_INSERT_FRAGMENT_RECORD:
                 final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
 
-                return 1 + 4 + 8 + 8 + 4 + difRec.payloadSize() + 4;
+                return commonFields + 4 + 8 + 8 + 4 + difRec.payloadSize();
 
             case DATA_PAGE_REMOVE_RECORD:
-                return 1 + 4 + 8 + 1 + 4;
+                return commonFields + 4 + 8 + 1;
 
             case DATA_PAGE_SET_FREE_LIST_PAGE:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case INIT_NEW_PAGE_RECORD:
-                return 1 + 4 + 8 + 2 + 2 + 8 + 4;
+                return commonFields + 4 + 8 + 2 + 2 + 8;
 
             case BTREE_META_PAGE_INIT_ROOT:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case BTREE_META_PAGE_INIT_ROOT2:
-                return 1 + 4 + 8 + 8 + 4 + 2;
+                return commonFields + 4 + 8 + 8 + 2;
 
             case BTREE_META_PAGE_ADD_ROOT:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case BTREE_META_PAGE_CUT_ROOT:
-                return 1 + 4 + 8 + 4;
+                return commonFields + 4 + 8;
 
             case BTREE_INIT_NEW_ROOT:
                 NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
 
-                return 1 + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize() + 4;
+                return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize();
 
             case BTREE_PAGE_RECYCLE:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case BTREE_PAGE_INSERT:
                 InsertRecord<?> inRec = (InsertRecord<?>)record;
 
-                return 1 + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize() + 4;
+                return commonFields + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize();
 
             case BTREE_FIX_LEFTMOST_CHILD:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case BTREE_FIX_COUNT:
-                return 1 + 4 + 8 + 2 + 4;
+                return commonFields + 4 + 8 + 2;
 
             case BTREE_PAGE_REPLACE:
                 ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
 
-                return 1 + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize() + 4;
+                return commonFields + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize();
 
             case BTREE_PAGE_REMOVE:
-                return 1 + 4 + 8 + 2 + 2 + 4;
+                return commonFields + 4 + 8 + 2 + 2;
 
             case BTREE_PAGE_INNER_REPLACE:
-                return 1 + 4 + 8 + 2 + 8 + 2 + 8 + 4;
+                return commonFields + 4 + 8 + 2 + 8 + 2 + 8;
 
             case BTREE_FORWARD_PAGE_SPLIT:
-                return 1 + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2 + 4;
+                return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2;
 
             case BTREE_EXISTING_PAGE_SPLIT:
-                return 1 + 4 + 8 + 2 + 8 + 4;
+                return commonFields + 4 + 8 + 2 + 8;
 
             case BTREE_PAGE_MERGE:
-                return 1 + 4 + 8 + 8 + 2 + 8 + 1 + 4;
+                return commonFields + 4 + 8 + 8 + 2 + 8 + 1;
 
             case BTREE_FIX_REMOVE_ID:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case PAGES_LIST_SET_NEXT:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case PAGES_LIST_SET_PREVIOUS:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case PAGES_LIST_INIT_NEW_PAGE:
-                return 1 + 4 + 8 + 4 + 4 + 8 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 4 + 4 + 8 + 8 + 8;
 
             case PAGES_LIST_ADD_PAGE:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case PAGES_LIST_REMOVE_PAGE:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case TRACKING_PAGE_DELTA:
-                return 1 + 4 + 8 + 8 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8 + 8 + 8;
 
             case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
-                return 1 + 4 + 8 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8 + 8;
 
             case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
-                return 1 + 4 + 8 + 8 + 4;
+                return commonFields + 4 + 8 + 8;
 
             case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
-                return 1 + 4 + 8 + 4 + 4;
+                return commonFields + 4 + 8 + 4;
 
             case PART_META_UPDATE_STATE:
-                return /*Type*/ 1 + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8 + /*CRC*/4;
+                return commonFields + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8;
 
             case PAGE_LIST_META_RESET_COUNT_RECORD:
-                return /*Type*/ 1 + /*cacheId*/ 4 + /*pageId*/ 8 + /*CRC*/4;
+                return commonFields + /*cacheId*/ 4 + /*pageId*/ 8;
 
             case SWITCH_SEGMENT_RECORD:
-                return  /*Type*/ 1 + /*CRC*/4;
+                return commonFields;
 
             default:
                 throw new UnsupportedOperationException("Type: " + record.type());
@@ -1386,6 +1370,27 @@ public class RecordV1Serializer implements RecordSerializer {
     }
 
     /**
+     * @param buf Byte buffer to serialize version to.
+     * @param ptr File WAL pointer to write.
+     */
+    private void putPosition(ByteBuffer buf, FileWALPointer ptr) {
+        buf.putLong(ptr.index());
+        buf.putInt(ptr.fileOffset());
+    }
+
+    /**
+     * @param in Data input to read pointer from.
+     * @return Read file WAL pointer.
+     * @throws IOException If failed to write.
+     */
+    private FileWALPointer readPosition(DataInput in) throws IOException {
+        long idx = in.readLong();
+        int fileOffset = in.readInt();
+
+        return new FileWALPointer(idx, fileOffset, 0);
+    }
+
+    /**
      * @param dataRec Data record to serialize.
      * @return Full data record size.
      * @throws IgniteCheckedException If failed to obtain the length of one of the entries.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
index bdf333c..225a9d2 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
@@ -464,7 +464,7 @@ public class IgniteWalRecoverySelfTest extends GridCommonAbstractTest {
 
         walSegmentSize = 2 * 1024 * 1024;
 
-        final long endTime = System.currentTimeMillis() + 3 * 60 * 1000;
+        final long endTime = System.currentTimeMillis() + 2 * 60 * 1000;
 
         try {
             IgniteEx ignite = startGrid(1);