You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/08/20 11:23:20 UTC

ignite git commit: IGNITE-9294 StandaloneWalRecordsIterator: support iteration from custom pointer - Fixes #4563.

Repository: ignite
Updated Branches:
  refs/heads/master 3b17bb18d -> 0a19d010f


IGNITE-9294 StandaloneWalRecordsIterator: support iteration from custom pointer - Fixes #4563.

Signed-off-by: Ivan Rakov <ir...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a19d010
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a19d010
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a19d010

Branch: refs/heads/master
Commit: 0a19d010fddae9a48b8a954b46c9864f1d5ceba8
Parents: 3b17bb1
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Aug 20 12:45:57 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Mon Aug 20 14:19:03 2018 +0300

----------------------------------------------------------------------
 .../wal/AbstractWalRecordsIterator.java         |   2 +-
 .../wal/reader/IgniteWalIteratorFactory.java    |  70 ++++++++
 .../reader/StandaloneWalRecordsIterator.java    |  80 ++++++++-
 .../db/wal/reader/IgniteWalReaderTest.java      | 176 ++++++++++++++++++-
 4 files changed, 313 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 27f0102..9fbb535 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -228,7 +228,7 @@ public abstract class AbstractWalRecordsIterator
      * @param hnd currently opened read handle.
      * @return next advanced record.
      */
-    private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
+    protected IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
         @Nullable final AbstractReadFileHandle hnd
     ) throws IgniteCheckedException {
         if (hnd == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index aae9775..cda8d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -108,6 +108,26 @@ public class IgniteWalIteratorFactory {
      * This method may be used for work folder, file indexes are scanned from the file context.
      * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
      *
+     * @param replayFrom File WAL pointer for start replay.
+     * @param filesOrDirs files to scan. A file can be the path to '.wal' file, or directory with '.wal' files.
+     * Order is not important, but it is significant to provide all segments without omissions.
+     * Path should not contain special symbols. Special symbols should be already masked.
+     * @return closable WAL records iterator, should be closed when non needed.
+     * @throws IgniteCheckedException if failed to read files
+     * @throws IllegalArgumentException If parameter validation failed.
+     */
+    public WALIterator iterator(
+        @NotNull FileWALPointer replayFrom,
+        @NotNull File... filesOrDirs
+    ) throws IgniteCheckedException, IllegalArgumentException {
+        return iterator(new IteratorParametersBuilder().from(replayFrom).filesOrDirs(filesOrDirs));
+    }
+
+    /**
+     * Creates iterator for file by file scan mode.
+     * This method may be used for work folder, file indexes are scanned from the file context.
+     * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
+     *
      * @param filesOrDirs paths to scan. A path can be direct to '.wal' file, or directory with '.wal' files.
      * Order is not important, but it is significant to provide all segments without omissions.
      * Path should not contain special symbols. Special symbols should be already masked.
@@ -122,6 +142,26 @@ public class IgniteWalIteratorFactory {
     }
 
     /**
+     * Creates iterator for file by file scan mode.
+     * This method may be used for work folder, file indexes are scanned from the file context.
+     * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
+     *
+     * @param replayFrom File WAL pointer for start replay.
+     * @param filesOrDirs paths to scan. A path can be direct to '.wal' file, or directory with '.wal' files.
+     * Order is not important, but it is significant to provide all segments without omissions.
+     * Path should not contain special symbols. Special symbols should be already masked.
+     * @return closable WAL records iterator, should be closed when non needed.
+     * @throws IgniteCheckedException If failed to read files.
+     * @throws IllegalArgumentException If parameter validation failed.
+     */
+    public WALIterator iterator(
+        @NotNull FileWALPointer replayFrom,
+        @NotNull String... filesOrDirs
+    ) throws IgniteCheckedException, IllegalArgumentException {
+        return iterator(new IteratorParametersBuilder().from(replayFrom).filesOrDirs(filesOrDirs));
+    }
+
+    /**
      * @param iteratorParametersBuilder Iterator parameters builder.
      * @return closable WAL records iterator, should be closed when non needed
      */
@@ -135,6 +175,8 @@ public class IgniteWalIteratorFactory {
             iteratorParametersBuilder.ioFactory,
             resolveWalFiles(iteratorParametersBuilder),
             iteratorParametersBuilder.filter,
+            iteratorParametersBuilder.lowBound,
+            iteratorParametersBuilder.highBound,
             iteratorParametersBuilder.keepBinary,
             iteratorParametersBuilder.bufferSize
         );
@@ -361,6 +403,12 @@ public class IgniteWalIteratorFactory {
         /** */
         @Nullable private IgniteBiPredicate<RecordType, WALPointer> filter;
 
+        /** */
+        private FileWALPointer lowBound = new FileWALPointer(Long.MIN_VALUE, 0, 0);
+
+        /** */
+        private FileWALPointer highBound = new FileWALPointer(Long.MAX_VALUE, Integer.MAX_VALUE, 0);
+
         /**
          * @param filesOrDirs Paths to files or directories.
          * @return IteratorParametersBuilder Self reference.
@@ -458,6 +506,26 @@ public class IgniteWalIteratorFactory {
         }
 
         /**
+         * @param lowBound WAL pointer to start from.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder from(FileWALPointer lowBound) {
+            this.lowBound = lowBound;
+
+            return this;
+        }
+
+        /**
+         * @param highBound WAL pointer to end of.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder to(FileWALPointer highBound) {
+            this.highBound = highBound;
+
+            return this;
+        }
+
+        /**
          * Copy current state of builder to new instance.
          *
          * @return IteratorParametersBuilder Self reference.
@@ -471,6 +539,8 @@ public class IgniteWalIteratorFactory {
                 .ioFactory(ioFactory)
                 .binaryMetadataFileStoreDir(binaryMetadataFileStoreDir)
                 .marshallerMappingFileStoreDir(marshallerMappingFileStoreDir)
+                .from(lowBound)
+                .to(highBound)
                 .filter(filter);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index d92c0e4..e934f33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
 import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -48,7 +49,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTai
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -77,6 +80,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
     /** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */
     private boolean keepBinary;
 
+    /** Replay from bound include. */
+    private final FileWALPointer lowBound;
+
+    /** Replay to bound include */
+    private final FileWALPointer highBound;
+
     /**
      * Creates iterator in file-by-file iteration mode. Directory
      * @param log Logger.
@@ -93,6 +102,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
         @NotNull FileIOFactory ioFactory,
         @NotNull List<FileDescriptor> walFiles,
         IgniteBiPredicate<RecordType, WALPointer> readTypeFilter,
+        FileWALPointer lowBound,
+        FileWALPointer highBound,
         boolean keepBinary,
         int initialReadBufferSize
     ) throws IgniteCheckedException {
@@ -104,6 +115,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
             initialReadBufferSize
         );
 
+        this.lowBound = lowBound;
+        this.highBound = highBound;
+
         this.keepBinary = keepBinary;
 
         walFileDescriptors = walFiles;
@@ -156,14 +170,19 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
         if (curWalSegment != null)
             curWalSegment.close();
 
-        curWalSegmIdx++;
+        FileDescriptor fd;
 
-        curIdx++;
+        do {
+            curWalSegmIdx++;
 
-        if (curIdx >= walFileDescriptors.size())
-            return null;
+            curIdx++;
 
-        FileDescriptor fd = walFileDescriptors.get(curIdx);
+            if (curIdx >= walFileDescriptors.size())
+                return null;
+
+            fd = walFileDescriptors.get(curIdx);
+        }
+        while (!checkBounds(fd.idx()));
 
         if (log.isDebugEnabled())
             log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file().getAbsolutePath() + ']');
@@ -173,7 +192,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
         curRec = null;
 
         try {
-            return initReadHandle(fd, null);
+            FileWALPointer initPtr = null;
+
+            if (lowBound.index() == fd.idx())
+                initPtr = lowBound;
+
+            return initReadHandle(fd, initPtr);
         }
         catch (FileNotFoundException e) {
             if (log.isInfoEnabled())
@@ -184,6 +208,50 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
+        @Nullable AbstractReadFileHandle hnd
+    ) throws IgniteCheckedException {
+        IgniteBiTuple<WALPointer, WALRecord> tup = super.advanceRecord(hnd);
+
+        if (tup == null)
+            return tup;
+
+        if (!checkBounds(tup.get1())) {
+            if (curRec != null) {
+                FileWALPointer prevRecPtr = (FileWALPointer)curRec.get1();
+
+                // Fast stop condition, after high bound reached.
+                if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0)
+                    return null;
+            }
+
+            return new T2<>(tup.get1(), FilteredRecord.INSTANCE); // FilteredRecord for mark as filtered.
+        }
+
+        return tup;
+    }
+
+    /**
+     *
+     * @param ptr WAL pointer.
+     * @return {@code True} If pointer between low and high bounds. {@code False} if not.
+     */
+    private boolean checkBounds(WALPointer ptr) {
+        FileWALPointer ptr0 = (FileWALPointer)ptr;
+
+        return ptr0.compareTo(lowBound) >= 0 && ptr0.compareTo(highBound) <= 0;
+    }
+
+    /**
+     *
+     * @param idx WAL segment index.
+     * @return {@code True} If pointer between low and high bounds. {@code False} if not.
+     */
+    private boolean checkBounds(long idx) {
+        return idx >= lowBound.index() && idx <= highBound.index();
+    }
+
+    /** {@inheritDoc} */
     @Override protected AbstractReadFileHandle initReadHandle(
         @NotNull AbstractFileDescriptor desc,
         @Nullable FileWALPointer start

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index c93f8bf..beab138 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -40,6 +42,7 @@ import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryObject;
@@ -63,9 +66,11 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -149,7 +154,6 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
                 new DataRegionConfiguration()
                     .setMaxSize(1024L * 1024 * 1024)
                     .setPersistenceEnabled(true))
-            .setWalHistorySize(1)
             .setWalSegmentSize(1024 * 1024)
             .setWalSegments(WAL_SEGMENTS)
             .setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND)
@@ -167,7 +171,8 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
             dsCfg.setWalPath(walAbsPath);
             dsCfg.setWalArchivePath(walAbsPath);
-        } else {
+        }
+        else {
             dsCfg.setWalPath(wal.getAbsolutePath());
             dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath());
         }
@@ -336,7 +341,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             return true;
         }, evtType);
 
-        putDummyRecords(ignite, 500);
+        putDummyRecords(ignite, 5_000);
 
         stopGrid();
 
@@ -499,7 +504,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         String workDir = U.defaultWorkDirectory();
 
-        IteratorParametersBuilder params = createIteratorParametersBuilder(workDir,subfolderName);
+        IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName);
 
         params.filesOrDirs(workDir);
 
@@ -765,7 +770,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             ctrlMapForBinaryObjects, ctrlMapForBinaryObjects.isEmpty());
 
         assertTrue(" Control Map for strings in entries is not empty after" +
-                " reading records: " + ctrlStringsForBinaryObjSearch, ctrlStringsForBinaryObjSearch.isEmpty());
+            " reading records: " + ctrlStringsForBinaryObjSearch, ctrlStringsForBinaryObjSearch.isEmpty());
     }
 
     /**
@@ -792,7 +797,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         IteratorParametersBuilder iterParametersBuilder =
             createIteratorParametersBuilder(workDir, subfolderName)
-            .filesOrDirs(workDir);
+                .filesOrDirs(workDir);
 
         scanIterateAndCount(
             factory,
@@ -853,12 +858,11 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
         runRemoveOperationTest(CacheAtomicityMode.ATOMIC);
     }
 
-
     /**
      * Test if DELETE operation can be found after mixed cache operations including remove().
      *
-     * @throws Exception if failed.
      * @param mode Cache Atomicity Mode.
+     * @throws Exception if failed.
      */
     private void runRemoveOperationTest(CacheAtomicityMode mode) throws Exception {
         Ignite ignite = startGrid();
@@ -1075,6 +1079,162 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCheckBoundsIterator() throws Exception {
+        Ignite ignite = startGrid("node0");
+
+        ignite.cluster().active(true);
+
+        try (IgniteDataStreamer<Integer, IndexedObject> st = ignite.dataStreamer(CACHE_NAME)) {
+            st.allowOverwrite(true);
+
+            for (int i = 0; i < 10_000; i++)
+                st.addData(i, new IndexedObject(i));
+        }
+
+        stopAllGrids();
+
+        List<FileWALPointer> wal = new ArrayList<>();
+
+        String workDir = U.defaultWorkDirectory();
+
+        IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory();
+
+        try (WALIterator it = factory.iterator(workDir)) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                wal.add((FileWALPointer)tup.get1());
+            }
+        }
+
+        Random rnd = new Random();
+
+        int from0 = rnd.nextInt(wal.size() - 2) + 1;
+        int to0 = wal.size() - 1;
+
+        // +1 for skip first record.
+        FileWALPointer exp0First = wal.get(from0);
+        FileWALPointer exp0Last = wal.get(to0);
+
+        T2<FileWALPointer, WALRecord> actl0First = null;
+        T2<FileWALPointer, WALRecord> actl0Last = null;
+
+        int records0 = 0;
+
+        try (WALIterator it = factory.iterator(exp0First, workDir)) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                if (actl0First == null)
+                    actl0First = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+                actl0Last = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+                records0++;
+            }
+        }
+
+        log.info("Check REPLAY FROM:" + exp0First + "\n" +
+            "expFirst=" + exp0First + " actlFirst=" + actl0First + ", " +
+            "expLast=" + exp0Last + " actlLast=" + actl0Last);
+
+        // +1 because bound include.
+        Assert.assertEquals(to0 - from0 + 1, records0);
+
+        Assert.assertNotNull(actl0First);
+        Assert.assertNotNull(actl0Last);
+
+        Assert.assertEquals(exp0First, actl0First.get1());
+        Assert.assertEquals(exp0Last, actl0Last.get1());
+
+        int from1 = 0;
+        int to1 = rnd.nextInt(wal.size() - 3) + 1;
+
+        // -3 for skip last record.
+        FileWALPointer exp1First = wal.get(from1);
+        FileWALPointer exp1Last = wal.get(to1);
+
+        T2<FileWALPointer, WALRecord> actl1First = null;
+        T2<FileWALPointer, WALRecord> actl1Last = null;
+
+        int records1 = 0;
+
+        try (WALIterator it = factory.iterator(
+            new IteratorParametersBuilder()
+                .filesOrDirs(workDir)
+                .to(exp1Last)
+        )) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                if (actl1First == null)
+                    actl1First = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+                actl1Last = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+                records1++;
+            }
+        }
+
+        log.info("Check REPLAY TO:" + exp1Last + "\n" +
+            "expFirst=" + exp1First + " actlFirst=" + actl1First + ", " +
+            "expLast=" + exp1Last + " actlLast=" + actl1Last);
+
+        // +1 because bound include.
+        Assert.assertEquals(to1 - from1 + 1, records1);
+
+        Assert.assertNotNull(actl1First);
+        Assert.assertNotNull(actl1Last);
+
+        Assert.assertEquals(exp1First, actl1First.get1());
+        Assert.assertEquals(exp1Last, actl1Last.get1());
+
+        int from2 = rnd.nextInt(wal.size() - 2);
+        int to2 = rnd.nextInt((wal.size() - 1) - from2) + from2;
+
+        FileWALPointer exp2First = wal.get(from2);
+        FileWALPointer exp2Last = wal.get(to2);
+
+        T2<FileWALPointer, WALRecord> actl2First = null;
+        T2<FileWALPointer, WALRecord> actl2Last = null;
+
+        int records2 = 0;
+
+        try (WALIterator it = factory.iterator(
+            new IteratorParametersBuilder()
+                .filesOrDirs(workDir)
+                .from(exp2First)
+                .to(exp2Last)
+        )) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                if (actl2First == null)
+                    actl2First = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+                actl2Last = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+                records2++;
+            }
+        }
+
+        log.info("Check REPLAY BETWEEN:" + exp2First + " " + exp2Last+ "\n" +
+            "expFirst=" + exp2First + " actlFirst=" + actl2First + ", " +
+            "expLast=" + exp2Last + " actlLast=" + actl2Last);
+
+        // +1 because bound include.
+        Assert.assertEquals(to2 - from2 + 1, records2);
+
+        Assert.assertNotNull(actl2First);
+        Assert.assertNotNull(actl2Last);
+
+        Assert.assertEquals(exp2First, actl2First.get1());
+        Assert.assertEquals(exp2Last, actl2Last.get1());
+    }
+
+    /**
      * @param workDir Work directory.
      * @param subfolderName Subfolder name.
      * @return WAL iterator factory.