You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/10/09 14:48:20 UTC
[25/50] [abbrv] ignite git commit: IGNITE-6547 Support logging
timestamp for WAL tx and data records - Fixes #2792.
IGNITE-6547 Support logging timestamp for WAL tx and data records - Fixes #2792.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6e1ca9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6e1ca9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6e1ca9a
Branch: refs/heads/ignite-6305
Commit: e6e1ca9a5a9155a550258b112415b65845d6bcef
Parents: 78f77b1
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Wed Oct 4 18:54:49 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 4 18:54:49 2017 +0300
----------------------------------------------------------------------
.../internal/pagemem/wal/record/DataRecord.java | 20 +-
.../pagemem/wal/record/TimeStampRecord.java | 57 ++++++
.../internal/pagemem/wal/record/TxRecord.java | 52 +++--
.../reader/StandaloneWalRecordsIterator.java | 2 +-
.../wal/serializer/RecordDataV1Serializer.java | 6 +-
.../wal/serializer/RecordDataV2Serializer.java | 49 ++++-
.../wal/serializer/TxRecordSerializer.java | 3 +-
.../cache/transactions/IgniteTxAdapter.java | 3 +-
.../db/wal/IgniteWalSerializerVersionTest.java | 205 ++++++++++++++++++-
9 files changed, 365 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 0e92383..ac569bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
* This record contains information about operation we want to do.
* Contains operation type (put, remove) and (Key, Value, Version) for each {@link DataEntry}
*/
-public class DataRecord extends WALRecord {
+public class DataRecord extends TimeStampRecord {
/** */
@GridToStringInclude
private List<DataEntry> writeEntries;
@@ -59,6 +59,24 @@ public class DataRecord extends WALRecord {
}
/**
+ * @param writeEntry Write entry.
+ * @param timestamp TimeStamp.
+ */
+ public DataRecord(DataEntry writeEntry, long timestamp) {
+ this(Collections.singletonList(writeEntry), timestamp);
+ }
+
+ /**
+ * @param writeEntries Write entries.
+ * @param timestamp TimeStamp.
+ */
+ public DataRecord(List<DataEntry> writeEntries, long timestamp) {
+ super(timestamp);
+
+ this.writeEntries = writeEntries;
+ }
+
+ /**
* @return Collection of write entries.
*/
public List<DataEntry> writeEntries() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
new file mode 100644
index 0000000..3f29dfd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Base class for records with timeStamp.
+ * All records which support timeStamp should be inherited from {@code TimeStampRecord}.
+ */
+public abstract class TimeStampRecord extends WALRecord {
+ /** Timestamp. */
+ protected long timestamp;
+
+ /**
+ *
+ */
+ protected TimeStampRecord() {
+ timestamp = U.currentTimeMillis();
+ }
+
+ /**
+ * @param timestamp TimeStamp.
+ */
+ protected TimeStampRecord(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * @param timestamp TimeStamp.
+ */
+ public void timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * @return TimeStamp.
+ */
+ public long timestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
index ce1e28e..f933fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
@@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
* Logical data record indented for transaction (tx) related actions.<br>
* This record is marker of begin, prepare, commit, and rollback transactions.
*/
-public class TxRecord extends WALRecord {
+public class TxRecord extends TimeStampRecord {
/** Transaction state. */
private TransactionState state;
@@ -49,28 +49,51 @@ public class TxRecord extends WALRecord {
/** If transaction is remote, primary node for this backup node. */
@Nullable private Object primaryNode;
- /** Timestamp of Tx state change. */
- private long timestamp;
-
/**
*
* @param state Transaction state.
* @param nearXidVer Transaction id.
* @param writeVer Transaction entries write topology version.
* @param participatingNodes Primary -> Backup nodes participating in transaction.
+ * @param primaryNode Primary node.
*/
- public TxRecord(TransactionState state,
- GridCacheVersion nearXidVer,
- GridCacheVersion writeVer,
- @Nullable Map<Object, Collection<Object>> participatingNodes,
- @Nullable Object primaryNode,
- long timestamp) {
+ public TxRecord(
+ TransactionState state,
+ GridCacheVersion nearXidVer,
+ GridCacheVersion writeVer,
+ @Nullable Map<Object, Collection<Object>> participatingNodes,
+ @Nullable Object primaryNode
+ ) {
+ this.state = state;
+ this.nearXidVer = nearXidVer;
+ this.writeVer = writeVer;
+ this.participatingNodes = participatingNodes;
+ this.primaryNode = primaryNode;
+ }
+
+ /**
+ * @param state Transaction state.
+ * @param nearXidVer Transaction id.
+ * @param writeVer Transaction entries write topology version.
+ * @param participatingNodes Primary -> Backup nodes participating in transaction.
+ * @param primaryNode Primary node.
+ * @param timestamp TimeStamp.
+ */
+ public TxRecord(
+ TransactionState state,
+ GridCacheVersion nearXidVer,
+ GridCacheVersion writeVer,
+ @Nullable Map<Object, Collection<Object>> participatingNodes,
+ @Nullable Object primaryNode,
+ long timestamp
+ ) {
+ super(timestamp);
+
this.state = state;
this.nearXidVer = nearXidVer;
this.writeVer = writeVer;
this.participatingNodes = participatingNodes;
this.primaryNode = primaryNode;
- this.timestamp = timestamp;
}
/** {@inheritDoc} */
@@ -148,13 +171,6 @@ public class TxRecord extends WALRecord {
return primaryNode;
}
- /**
- * @return Timestamp of Tx state change in millis.
- */
- public long timestamp() {
- return timestamp;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TxRecord.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index f1258a0..24b2148 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -305,7 +305,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
postProcessedEntries.add(postProcessedEntry);
}
- return new DataRecord(postProcessedEntries);
+ return new DataRecord(postProcessedEntries, dataRec.timestamp());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 8b5e6ba..e583df3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -384,7 +384,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
for (int i = 0; i < entryCnt; i++)
entries.add(readDataEntry(in));
- res = new DataRecord(entries);
+ res = new DataRecord(entries, 0L);
break;
@@ -1322,7 +1322,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @param buf Buffer to write to.
* @param entry Data entry.
*/
- private static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException {
+ static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException {
buf.putInt(entry.cacheId());
if (!entry.key().putValue(buf))
@@ -1390,7 +1390,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @param in Input to read from.
* @return Read entry.
*/
- private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+ DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
int cacheId = in.readInt();
int keySize = in.readInt();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 2b55c5f..c02f36e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
@@ -46,12 +50,35 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
if (record instanceof HeaderRecord)
throw new UnsupportedOperationException("Getting size of header records is forbidden since version 2 of serializer");
- return delegateSerializer.size(record);
+ switch (record.type()) {
+ case DATA_RECORD:
+ return delegateSerializer.size(record) + 8/*timestamp*/;
+
+ default:
+ return delegateSerializer.size(record);
+ }
}
/** {@inheritDoc} */
- @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
- return delegateSerializer.readRecord(type, in);
+ @Override public WALRecord readRecord(
+ WALRecord.RecordType type,
+ ByteBufferBackedDataInput in
+ ) throws IOException, IgniteCheckedException {
+ switch (type) {
+ case DATA_RECORD:
+ int entryCnt = in.readInt();
+ long timeStamp = in.readLong();
+
+ List<DataEntry> entries = new ArrayList<>(entryCnt);
+
+ for (int i = 0; i < entryCnt; i++)
+ entries.add(delegateSerializer.readDataEntry(in));
+
+ return new DataRecord(entries, timeStamp);
+
+ default:
+ return delegateSerializer.readRecord(type, in);
+ }
}
/** {@inheritDoc} */
@@ -59,6 +86,20 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
if (record instanceof HeaderRecord)
throw new UnsupportedOperationException("Writing header records is forbidden since version 2 of serializer");
- delegateSerializer.writeRecord(record, buf);
+ switch (record.type()) {
+ case DATA_RECORD:
+ DataRecord dataRec = (DataRecord)record;
+
+ buf.putInt(dataRec.writeEntries().size());
+ buf.putLong(dataRec.timestamp());
+
+ for (DataEntry dataEntry : dataRec.writeEntries())
+ RecordDataV1Serializer.putDataEntry(buf, dataEntry);
+
+ break;
+
+ default:
+ delegateSerializer.writeRecord(record, buf);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
index 448bdbc..e8b324d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
@@ -75,9 +75,8 @@ public class TxRecordSerializer {
buf.putInt(backupNodes.size());
- for (Object backupNode : backupNodes) {
+ for (Object backupNode : backupNodes)
writeConsistentId(backupNode, buf);
- }
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b5178b5..00c637e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1116,8 +1116,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
nearXidVersion(),
writeVersion(),
participatingNodes,
- remote() ? nodeId() : null,
- U.currentTimeMillis()
+ remote() ? nodeId() : null
);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
index f31d0f9..ddf74c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
@@ -17,23 +17,38 @@
package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionState;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
/**
*
@@ -46,7 +61,7 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
@@ -101,4 +116,192 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest {
stopGrid();
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCheckDifferentSerializerVersionsAndLogTimestamp() throws Exception {
+ IgniteCallable<List<WALRecord>> recordsFactory = new IgniteCallable<List<WALRecord>>() {
+ @Override public List<WALRecord> call() throws Exception {
+ WALRecord rec0 = new DataRecord(Collections.<DataEntry>emptyList());
+
+ WALRecord rec1 = new TxRecord(
+ TransactionState.PREPARED,
+ null,
+ null,
+ null,
+ null
+ );
+
+ return Arrays.asList(rec0, rec1);
+ }
+ };
+
+ long time0 = U.currentTimeMillis();
+
+ check(new Checker(
+ 1,
+ RecordV1Serializer.class,
+ recordsFactory,
+ Arrays.asList(0L, time0)
+ ));
+
+ long time1 = U.currentTimeMillis();
+
+ check(new Checker(
+ 2,
+ RecordV2Serializer.class,
+ recordsFactory,
+ Arrays.asList(time1, time1)
+ ));
+ }
+
+ /**
+ *
+ */
+ public static class Checker {
+ /** */
+ private final int serializerVer;
+
+ /** */
+ private final Class serializer;
+
+ /** */
+ private final List<Long> timeStamps;
+
+ /** */
+ private final IgniteCallable<List<WALRecord>> recordsToWrite;
+
+ /**
+ *
+ */
+ public Checker(
+ int serializerVer,
+ Class serializer,
+ IgniteCallable<List<WALRecord>> recordsToWrite,
+ List<Long> timeStamps) {
+ this.serializerVer = serializerVer;
+ this.serializer = serializer;
+ this.timeStamps = timeStamps;
+ this.recordsToWrite = recordsToWrite;
+ }
+
+ /**
+ *
+ */
+ public int serializerVersion() {
+ return serializerVer;
+ }
+
+ /**
+ *
+ */
+ public Class serializer() {
+ return serializer;
+ }
+
+ /**
+ *
+ */
+ public List<Long> getTimeStamps() {
+ return timeStamps;
+ }
+
+ /**
+ *
+ */
+ public List<WALRecord> recordsToWrite() throws Exception {
+ return recordsToWrite.call();
+ }
+
+ /**
+ *
+ */
+ public void assertRecords(long exp, WALRecord act) {
+ if (act instanceof TimeStampRecord) {
+ TimeStampRecord act0 = (TimeStampRecord)act;
+
+ if (exp == 0L)
+ assertTrue(act0.timestamp() == 0L);
+ else{
+ long diff = Math.abs(exp - act0.timestamp());
+
+ assertTrue(String.valueOf(diff), diff < 10_000);
+ }
+ }
+ else
+ fail();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void check(Checker checker) throws Exception {
+ System.setProperty(IGNITE_WAL_SERIALIZER_VERSION, Integer.toString(checker.serializerVersion()));
+
+ IgniteEx ig0 = (IgniteEx)startGrid();
+
+ ig0.active(true);
+
+ IgniteWriteAheadLogManager wal = ig0.context().cache().context().wal();
+
+ RecordSerializer ser0 = U.field(wal, "serializer");
+
+ assertTrue(ser0.getClass().getName().equals(checker.serializer().getName()));
+
+ List<WALRecord> recs = checker.recordsToWrite();
+
+ assertTrue(!recs.isEmpty());
+
+ WALPointer p = null;
+
+ for (WALRecord rec : recs) {
+ WALPointer p0 = wal.log(rec);
+
+ if (p == null)
+ p = p0;
+ }
+
+ wal.fsync(null);
+
+ Iterator<Long> itToCheck = checker.getTimeStamps().iterator();
+
+ try (WALIterator it = wal.replay(p)) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup0 = it.next();
+
+ checker.assertRecords(itToCheck.next(), tup0.get2());
+ }
+ }
+
+ stopGrid();
+
+ System.clearProperty(IGNITE_WAL_SERIALIZER_VERSION);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ deleteWorkFiles();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void deleteWorkFiles() throws IgniteCheckedException {
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
+ }
}