You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/08/20 11:23:20 UTC
ignite git commit: IGNITE-9294 StandaloneWalRecordsIterator: support
iteration from custom pointer - Fixes #4563.
Repository: ignite
Updated Branches:
refs/heads/master 3b17bb18d -> 0a19d010f
IGNITE-9294 StandaloneWalRecordsIterator: support iteration from custom pointer - Fixes #4563.
Signed-off-by: Ivan Rakov <ir...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a19d010
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a19d010
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a19d010
Branch: refs/heads/master
Commit: 0a19d010fddae9a48b8a954b46c9864f1d5ceba8
Parents: 3b17bb1
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Aug 20 12:45:57 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Mon Aug 20 14:19:03 2018 +0300
----------------------------------------------------------------------
.../wal/AbstractWalRecordsIterator.java | 2 +-
.../wal/reader/IgniteWalIteratorFactory.java | 70 ++++++++
.../reader/StandaloneWalRecordsIterator.java | 80 ++++++++-
.../db/wal/reader/IgniteWalReaderTest.java | 176 ++++++++++++++++++-
4 files changed, 313 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 27f0102..9fbb535 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -228,7 +228,7 @@ public abstract class AbstractWalRecordsIterator
* @param hnd currently opened read handle.
* @return next advanced record.
*/
- private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
+ protected IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
@Nullable final AbstractReadFileHandle hnd
) throws IgniteCheckedException {
if (hnd == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index aae9775..cda8d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -108,6 +108,26 @@ public class IgniteWalIteratorFactory {
* This method may be used for work folder, file indexes are scanned from the file context.
* In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
*
+ * @param replayFrom File WAL pointer for start replay.
+ * @param filesOrDirs files to scan. A file can be the path to '.wal' file, or directory with '.wal' files.
+ * Order is not important, but it is significant to provide all segments without omissions.
+ * Path should not contain special symbols. Special symbols should be already masked.
+ * @return closable WAL records iterator, should be closed when non needed.
+ * @throws IgniteCheckedException if failed to read files
+ * @throws IllegalArgumentException If parameter validation failed.
+ */
+ public WALIterator iterator(
+ @NotNull FileWALPointer replayFrom,
+ @NotNull File... filesOrDirs
+ ) throws IgniteCheckedException, IllegalArgumentException {
+ return iterator(new IteratorParametersBuilder().from(replayFrom).filesOrDirs(filesOrDirs));
+ }
+
+ /**
+ * Creates iterator for file by file scan mode.
+ * This method may be used for work folder, file indexes are scanned from the file context.
+ * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
+ *
* @param filesOrDirs paths to scan. A path can be direct to '.wal' file, or directory with '.wal' files.
* Order is not important, but it is significant to provide all segments without omissions.
* Path should not contain special symbols. Special symbols should be already masked.
@@ -122,6 +142,26 @@ public class IgniteWalIteratorFactory {
}
/**
+ * Creates iterator for file by file scan mode.
+ * This method may be used for work folder, file indexes are scanned from the file context.
+ * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
+ *
+ * @param replayFrom File WAL pointer for start replay.
+ * @param filesOrDirs paths to scan. A path can be direct to '.wal' file, or directory with '.wal' files.
+ * Order is not important, but it is significant to provide all segments without omissions.
+ * Path should not contain special symbols. Special symbols should be already masked.
+ * @return closable WAL records iterator, should be closed when non needed.
+ * @throws IgniteCheckedException If failed to read files.
+ * @throws IllegalArgumentException If parameter validation failed.
+ */
+ public WALIterator iterator(
+ @NotNull FileWALPointer replayFrom,
+ @NotNull String... filesOrDirs
+ ) throws IgniteCheckedException, IllegalArgumentException {
+ return iterator(new IteratorParametersBuilder().from(replayFrom).filesOrDirs(filesOrDirs));
+ }
+
+ /**
* @param iteratorParametersBuilder Iterator parameters builder.
* @return closable WAL records iterator, should be closed when non needed
*/
@@ -135,6 +175,8 @@ public class IgniteWalIteratorFactory {
iteratorParametersBuilder.ioFactory,
resolveWalFiles(iteratorParametersBuilder),
iteratorParametersBuilder.filter,
+ iteratorParametersBuilder.lowBound,
+ iteratorParametersBuilder.highBound,
iteratorParametersBuilder.keepBinary,
iteratorParametersBuilder.bufferSize
);
@@ -361,6 +403,12 @@ public class IgniteWalIteratorFactory {
/** */
@Nullable private IgniteBiPredicate<RecordType, WALPointer> filter;
+ /** */
+ private FileWALPointer lowBound = new FileWALPointer(Long.MIN_VALUE, 0, 0);
+
+ /** */
+ private FileWALPointer highBound = new FileWALPointer(Long.MAX_VALUE, Integer.MAX_VALUE, 0);
+
/**
* @param filesOrDirs Paths to files or directories.
* @return IteratorParametersBuilder Self reference.
@@ -458,6 +506,26 @@ public class IgniteWalIteratorFactory {
}
/**
+ * @param lowBound WAL pointer to start from.
+ * @return IteratorParametersBuilder Self reference.
+ */
+ public IteratorParametersBuilder from(FileWALPointer lowBound) {
+ this.lowBound = lowBound;
+
+ return this;
+ }
+
+ /**
+ * @param highBound WAL pointer to end of.
+ * @return IteratorParametersBuilder Self reference.
+ */
+ public IteratorParametersBuilder to(FileWALPointer highBound) {
+ this.highBound = highBound;
+
+ return this;
+ }
+
+ /**
* Copy current state of builder to new instance.
*
* @return IteratorParametersBuilder Self reference.
@@ -471,6 +539,8 @@ public class IgniteWalIteratorFactory {
.ioFactory(ioFactory)
.binaryMetadataFileStoreDir(binaryMetadataFileStoreDir)
.marshallerMappingFileStoreDir(marshallerMappingFileStoreDir)
+ .from(lowBound)
+ .to(highBound)
.filter(filter);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index d92c0e4..e934f33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -48,7 +49,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTai
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -77,6 +80,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
/** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */
private boolean keepBinary;
+ /** Replay from bound include. */
+ private final FileWALPointer lowBound;
+
+ /** Replay to bound include */
+ private final FileWALPointer highBound;
+
/**
* Creates iterator in file-by-file iteration mode. Directory
* @param log Logger.
@@ -93,6 +102,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
@NotNull FileIOFactory ioFactory,
@NotNull List<FileDescriptor> walFiles,
IgniteBiPredicate<RecordType, WALPointer> readTypeFilter,
+ FileWALPointer lowBound,
+ FileWALPointer highBound,
boolean keepBinary,
int initialReadBufferSize
) throws IgniteCheckedException {
@@ -104,6 +115,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
initialReadBufferSize
);
+ this.lowBound = lowBound;
+ this.highBound = highBound;
+
this.keepBinary = keepBinary;
walFileDescriptors = walFiles;
@@ -156,14 +170,19 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
if (curWalSegment != null)
curWalSegment.close();
- curWalSegmIdx++;
+ FileDescriptor fd;
- curIdx++;
+ do {
+ curWalSegmIdx++;
- if (curIdx >= walFileDescriptors.size())
- return null;
+ curIdx++;
- FileDescriptor fd = walFileDescriptors.get(curIdx);
+ if (curIdx >= walFileDescriptors.size())
+ return null;
+
+ fd = walFileDescriptors.get(curIdx);
+ }
+ while (!checkBounds(fd.idx()));
if (log.isDebugEnabled())
log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file().getAbsolutePath() + ']');
@@ -173,7 +192,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
curRec = null;
try {
- return initReadHandle(fd, null);
+ FileWALPointer initPtr = null;
+
+ if (lowBound.index() == fd.idx())
+ initPtr = lowBound;
+
+ return initReadHandle(fd, initPtr);
}
catch (FileNotFoundException e) {
if (log.isInfoEnabled())
@@ -184,6 +208,50 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
}
/** {@inheritDoc} */
+ @Override protected IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
+ @Nullable AbstractReadFileHandle hnd
+ ) throws IgniteCheckedException {
+ IgniteBiTuple<WALPointer, WALRecord> tup = super.advanceRecord(hnd);
+
+ if (tup == null)
+ return tup;
+
+ if (!checkBounds(tup.get1())) {
+ if (curRec != null) {
+ FileWALPointer prevRecPtr = (FileWALPointer)curRec.get1();
+
+ // Fast stop condition, after high bound reached.
+ if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0)
+ return null;
+ }
+
+ return new T2<>(tup.get1(), FilteredRecord.INSTANCE); // FilteredRecord for mark as filtered.
+ }
+
+ return tup;
+ }
+
+ /**
+ *
+ * @param ptr WAL pointer.
+ * @return {@code True} If pointer between low and high bounds. {@code False} if not.
+ */
+ private boolean checkBounds(WALPointer ptr) {
+ FileWALPointer ptr0 = (FileWALPointer)ptr;
+
+ return ptr0.compareTo(lowBound) >= 0 && ptr0.compareTo(highBound) <= 0;
+ }
+
+ /**
+ *
+ * @param idx WAL segment index.
+ * @return {@code True} If pointer between low and high bounds. {@code False} if not.
+ */
+ private boolean checkBounds(long idx) {
+ return idx >= lowBound.index() && idx <= highBound.index();
+ }
+
+ /** {@inheritDoc} */
@Override protected AbstractReadFileHandle initReadHandle(
@NotNull AbstractFileDescriptor desc,
@Nullable FileWALPointer start
http://git-wip-us.apache.org/repos/asf/ignite/blob/0a19d010/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index c93f8bf..beab138 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
@@ -31,6 +32,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -40,6 +42,7 @@ import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryObject;
@@ -63,9 +66,11 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -149,7 +154,6 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
new DataRegionConfiguration()
.setMaxSize(1024L * 1024 * 1024)
.setPersistenceEnabled(true))
- .setWalHistorySize(1)
.setWalSegmentSize(1024 * 1024)
.setWalSegments(WAL_SEGMENTS)
.setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND)
@@ -167,7 +171,8 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
dsCfg.setWalPath(walAbsPath);
dsCfg.setWalArchivePath(walAbsPath);
- } else {
+ }
+ else {
dsCfg.setWalPath(wal.getAbsolutePath());
dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath());
}
@@ -336,7 +341,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
return true;
}, evtType);
- putDummyRecords(ignite, 500);
+ putDummyRecords(ignite, 5_000);
stopGrid();
@@ -499,7 +504,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
String workDir = U.defaultWorkDirectory();
- IteratorParametersBuilder params = createIteratorParametersBuilder(workDir,subfolderName);
+ IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName);
params.filesOrDirs(workDir);
@@ -765,7 +770,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
ctrlMapForBinaryObjects, ctrlMapForBinaryObjects.isEmpty());
assertTrue(" Control Map for strings in entries is not empty after" +
- " reading records: " + ctrlStringsForBinaryObjSearch, ctrlStringsForBinaryObjSearch.isEmpty());
+ " reading records: " + ctrlStringsForBinaryObjSearch, ctrlStringsForBinaryObjSearch.isEmpty());
}
/**
@@ -792,7 +797,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
IteratorParametersBuilder iterParametersBuilder =
createIteratorParametersBuilder(workDir, subfolderName)
- .filesOrDirs(workDir);
+ .filesOrDirs(workDir);
scanIterateAndCount(
factory,
@@ -853,12 +858,11 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
runRemoveOperationTest(CacheAtomicityMode.ATOMIC);
}
-
/**
* Test if DELETE operation can be found after mixed cache operations including remove().
*
- * @throws Exception if failed.
* @param mode Cache Atomicity Mode.
+ * @throws Exception if failed.
*/
private void runRemoveOperationTest(CacheAtomicityMode mode) throws Exception {
Ignite ignite = startGrid();
@@ -1075,6 +1079,162 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testCheckBoundsIterator() throws Exception {
+ Ignite ignite = startGrid("node0");
+
+ ignite.cluster().active(true);
+
+ try (IgniteDataStreamer<Integer, IndexedObject> st = ignite.dataStreamer(CACHE_NAME)) {
+ st.allowOverwrite(true);
+
+ for (int i = 0; i < 10_000; i++)
+ st.addData(i, new IndexedObject(i));
+ }
+
+ stopAllGrids();
+
+ List<FileWALPointer> wal = new ArrayList<>();
+
+ String workDir = U.defaultWorkDirectory();
+
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory();
+
+ try (WALIterator it = factory.iterator(workDir)) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+ wal.add((FileWALPointer)tup.get1());
+ }
+ }
+
+ Random rnd = new Random();
+
+ int from0 = rnd.nextInt(wal.size() - 2) + 1;
+ int to0 = wal.size() - 1;
+
+ // +1 for skip first record.
+ FileWALPointer exp0First = wal.get(from0);
+ FileWALPointer exp0Last = wal.get(to0);
+
+ T2<FileWALPointer, WALRecord> actl0First = null;
+ T2<FileWALPointer, WALRecord> actl0Last = null;
+
+ int records0 = 0;
+
+ try (WALIterator it = factory.iterator(exp0First, workDir)) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+ if (actl0First == null)
+ actl0First = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+ actl0Last = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+ records0++;
+ }
+ }
+
+ log.info("Check REPLAY FROM:" + exp0First + "\n" +
+ "expFirst=" + exp0First + " actlFirst=" + actl0First + ", " +
+ "expLast=" + exp0Last + " actlLast=" + actl0Last);
+
+ // +1 because bound include.
+ Assert.assertEquals(to0 - from0 + 1, records0);
+
+ Assert.assertNotNull(actl0First);
+ Assert.assertNotNull(actl0Last);
+
+ Assert.assertEquals(exp0First, actl0First.get1());
+ Assert.assertEquals(exp0Last, actl0Last.get1());
+
+ int from1 = 0;
+ int to1 = rnd.nextInt(wal.size() - 3) + 1;
+
+ // -3 for skip last record.
+ FileWALPointer exp1First = wal.get(from1);
+ FileWALPointer exp1Last = wal.get(to1);
+
+ T2<FileWALPointer, WALRecord> actl1First = null;
+ T2<FileWALPointer, WALRecord> actl1Last = null;
+
+ int records1 = 0;
+
+ try (WALIterator it = factory.iterator(
+ new IteratorParametersBuilder()
+ .filesOrDirs(workDir)
+ .to(exp1Last)
+ )) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+ if (actl1First == null)
+ actl1First = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+ actl1Last = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+ records1++;
+ }
+ }
+
+ log.info("Check REPLAY TO:" + exp1Last + "\n" +
+ "expFirst=" + exp1First + " actlFirst=" + actl1First + ", " +
+ "expLast=" + exp1Last + " actlLast=" + actl1Last);
+
+ // +1 because bound include.
+ Assert.assertEquals(to1 - from1 + 1, records1);
+
+ Assert.assertNotNull(actl1First);
+ Assert.assertNotNull(actl1Last);
+
+ Assert.assertEquals(exp1First, actl1First.get1());
+ Assert.assertEquals(exp1Last, actl1Last.get1());
+
+ int from2 = rnd.nextInt(wal.size() - 2);
+ int to2 = rnd.nextInt((wal.size() - 1) - from2) + from2;
+
+ FileWALPointer exp2First = wal.get(from2);
+ FileWALPointer exp2Last = wal.get(to2);
+
+ T2<FileWALPointer, WALRecord> actl2First = null;
+ T2<FileWALPointer, WALRecord> actl2Last = null;
+
+ int records2 = 0;
+
+ try (WALIterator it = factory.iterator(
+ new IteratorParametersBuilder()
+ .filesOrDirs(workDir)
+ .from(exp2First)
+ .to(exp2Last)
+ )) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+ if (actl2First == null)
+ actl2First = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+ actl2Last = new T2<>((FileWALPointer)tup.get1(), tup.get2());
+
+ records2++;
+ }
+ }
+
+ log.info("Check REPLAY BETWEEN:" + exp2First + " " + exp2Last+ "\n" +
+ "expFirst=" + exp2First + " actlFirst=" + actl2First + ", " +
+ "expLast=" + exp2Last + " actlLast=" + actl2Last);
+
+ // +1 because bound include.
+ Assert.assertEquals(to2 - from2 + 1, records2);
+
+ Assert.assertNotNull(actl2First);
+ Assert.assertNotNull(actl2Last);
+
+ Assert.assertEquals(exp2First, actl2First.get1());
+ Assert.assertEquals(exp2Last, actl2Last.get1());
+ }
+
+ /**
* @param workDir Work directory.
* @param subfolderName Subfolder name.
* @return WAL iterator factory.