You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/10/03 14:03:16 UTC
[02/37] ignite git commit: IGNITE-6029 Record serializer refactoring
and initial stuff for Record V2 serialization.
IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization.
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00770767
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00770767
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00770767
Branch: refs/heads/ignite-2.3
Commit: 007707674e32b4123708e267feec04075a1b4663
Parents: b7bb792
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Thu Sep 28 12:15:19 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Thu Sep 28 12:16:15 2017 +0300
----------------------------------------------------------------------
.../pagemem/wal/record/SwitchSegmentRecord.java | 28 +
.../wal/AbstractWalRecordsIterator.java | 54 +-
.../wal/FileWriteAheadLogManager.java | 193 +-
.../persistence/wal/RecordDataSerializer.java | 41 +
.../wal/WalSegmentTailReachedException.java | 37 +
.../wal/reader/IgniteWalIteratorFactory.java | 5 +-
.../reader/StandaloneWalRecordsIterator.java | 27 +-
.../wal/serializer/RecordDataV1Serializer.java | 1574 ++++++++++++++++
.../wal/serializer/RecordDataV2Serializer.java | 64 +
.../wal/serializer/RecordV1Serializer.java | 1673 ++----------------
.../wal/serializer/RecordV2Serializer.java | 170 ++
.../persistence/wal/serializer/io/RecordIO.java | 60 +
12 files changed, 2305 insertions(+), 1621 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java
new file mode 100644
index 0000000..948ec7e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+/**
+ * Record is needed to mark end of segment.
+ */
+public class SwitchSegmentRecord extends WALRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.SWITCH_SEGMENT_RECORD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index d5a2555..5be6e55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
@@ -154,21 +153,31 @@ public abstract class AbstractWalRecordsIterator
*/
protected void advance() throws IgniteCheckedException {
while (true) {
- curRec = advanceRecord(currWalSegment);
-
- if (curRec != null)
- return;
- else {
- currWalSegment = advanceSegment(currWalSegment);
+ try {
+ curRec = advanceRecord(currWalSegment);
- if (currWalSegment == null)
+ if (curRec != null)
return;
+ else {
+ currWalSegment = advanceSegment(currWalSegment);
+
+ if (currWalSegment == null)
+ return;
+ }
+ }
+ catch (WalSegmentTailReachedException e) {
+ log.warning(e.getMessage());
+
+ curRec = null;
+
+ return;
}
}
}
/**
* Closes and returns WAL segment (if any)
+ *
* @return closed handle
* @throws IgniteCheckedException if IO failed
*/
@@ -199,7 +208,8 @@ public abstract class AbstractWalRecordsIterator
* @return next advanced record
*/
private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
- @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd) {
+ @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd
+ ) throws IgniteCheckedException {
if (hnd == null)
return null;
@@ -217,8 +227,12 @@ public abstract class AbstractWalRecordsIterator
return new IgniteBiTuple<>((WALPointer)ptr, postProcessRecord(rec));
}
catch (IOException | IgniteCheckedException e) {
+ if (e instanceof WalSegmentTailReachedException)
+ throw (WalSegmentTailReachedException)e;
+
if (!(e instanceof SegmentEofException))
handleRecordException(e, ptr);
+
return null;
}
}
@@ -261,24 +275,18 @@ public abstract class AbstractWalRecordsIterator
FileIO fileIO = ioFactory.create(desc.file);
try {
- FileInput in = new FileInput(fileIO, buf);
+ int serVer = FileWriteAheadLogManager.readSerializerVersion(fileIO);
- // Header record must be agnostic to the serializer version.
- WALRecord rec = serializer.readRecord(in,
- new FileWALPointer(desc.idx, (int)fileIO.position(), 0));
+ RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, serVer);
- if (rec == null)
- return null;
-
- if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
- throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile());
-
- int ver = ((HeaderRecord)rec).version();
+ FileInput in = new FileInput(fileIO, buf);
- RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver, serializer.writePointer());
+ if (start != null && desc.idx == start.index()) {
+ // Make sure we skip header with serializer version.
+ long startOffset = Math.max(start.fileOffset(), fileIO.position());
- if (start != null && desc.idx == start.index())
- in.seek(start.fileOffset());
+ in.seek(startOffset);
+ }
return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 9b2d948..c4582cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
@@ -62,8 +63,12 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -116,6 +121,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
};
+ /** Latest serializer version to use. */
+ public static final int LATEST_SERIALIZER_VERSION = 1;
+
/** */
private final boolean alwaysWriteFullPages;
@@ -152,9 +160,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** WAL archive directory (including consistent ID as subfolder) */
private File walArchiveDir;
- /** Serializer of current version, used to read header record and for write records */
+ /** Serializer of latest version. */
private RecordSerializer serializer;
+ /** Serializer latest version to use. */
+ private int serializerVersion = LATEST_SERIALIZER_VERSION;
+
/** */
private volatile long oldestArchiveSegmentIdx;
@@ -266,7 +277,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
"write ahead log archive directory"
);
- serializer = new RecordV1Serializer(cctx);
+ serializer = forVersion(cctx, serializerVersion);
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
@@ -818,10 +829,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
FileIO fileIO = ioFactory.create(curFile);
try {
- // readSerializerVersion will change the channel position.
- // This is fine because the FileWriteHandle consitructor will move it
- // to offset + len anyways.
- int serVer = readSerializerVersion(fileIO, curFile, absIdx);
+ int serVer = serializerVersion;
+
+ // If we have existing segment, try to read version from it.
+ if (lastReadPtr != null) {
+ try {
+ serVer = readSerializerVersion(fileIO);
+ }
+ catch (SegmentEofException | EOFException ignore) {
+ serVer = serializerVersion;
+ }
+ }
RecordSerializer ser = forVersion(cctx, serVer);
@@ -837,13 +855,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
maxWalSegmentSize,
ser);
- if (lastReadPtr == null) {
- HeaderRecord header = new HeaderRecord(serializer.version());
-
- header.size(serializer.size(header));
-
- hnd.addRecord(header);
- }
+ // For new handle write serializer version to it.
+ if (lastReadPtr == null)
+ hnd.writeSerializerVersion();
archiver.currentWalIndex(absIdx);
@@ -887,11 +901,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
maxWalSegmentSize,
serializer);
- HeaderRecord header = new HeaderRecord(serializer.version());
-
- header.size(serializer.size(header));
-
- hnd.addRecord(header);
+ hnd.writeSerializerVersion();
return hnd;
}
@@ -1010,10 +1020,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * @param cctx Shared context.
* @param ver Serializer version.
* @return Entry serializer.
*/
- static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
+ public static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
return forVersion(cctx, ver, false);
}
@@ -1027,7 +1038,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
switch (ver) {
case 1:
- return new RecordV1Serializer(cctx, writePointer);
+ return new RecordV1Serializer(new RecordDataV1Serializer(cctx), writePointer);
+
+ case 2:
+ RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
+
+ return new RecordV2Serializer(dataV2Serializer, writePointer);
default:
throw new IgniteCheckedException("Failed to create a serializer with the given version " +
@@ -1434,29 +1450,103 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * Reads record serializer version from provided {@code io}.
+ * NOTE: Method mutates position of {@code io}.
+ *
* @param io I/O interface for file.
- * @param file File object.
- * @param idx File index to read.
* @return Serializer version stored in the file.
- * @throws IOException If failed to read serializer version.
* @throws IgniteCheckedException If failed to read serializer version.
*/
- private int readSerializerVersion(FileIO io, File file, long idx)
- throws IOException, IgniteCheckedException {
- try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())){
+ public static int readSerializerVersion(FileIO io)
+ throws IgniteCheckedException, IOException {
+ try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
FileInput in = new FileInput(io, buf);
- // Header record must be agnostic to the serializer version.
- WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
+ in.ensure(RecordV1Serializer.HEADER_RECORD_SIZE);
+
+ int recordType = in.readUnsignedByte();
+
+ if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
+ throw new SegmentEofException("Reached logical end of the segment", null);
+
+ WALRecord.RecordType type = WALRecord.RecordType.fromOrdinal(recordType - 1);
+
+ if (type != WALRecord.RecordType.HEADER_RECORD)
+ throw new IOException("Can't read serializer version", null);
+
+ // Read file pointer.
+ FileWALPointer ptr = RecordV1Serializer.readPosition(in);
+
+ assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr;
+
+ long headerMagicNumber = in.readLong();
+
+ if (headerMagicNumber != HeaderRecord.MAGIC)
+ throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
+ ", actual=" + U.hexLong(headerMagicNumber) + ']');
+
+ // Read serializer version.
+ int version = in.readInt();
+
+ // Read and skip CRC.
+ in.readInt();
+
+ return version;
+ }
+ }
+
+ /**
+ * Writes record serializer version to provided {@code io}.
+ * NOTE: Method mutates position of {@code io}.
+ *
+ * @param io I/O interface for file.
+ * @param idx Segment index.
+ * @param version Serializer version.
+ * @return I/O position after write version.
+ * @throws IOException If failed to write serializer version.
+ */
+ public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+ buffer.order(ByteOrder.nativeOrder());
+
+ // Write record type.
+ buffer.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
+
+ // Write position.
+ RecordV1Serializer.putPosition(buffer, new FileWALPointer(idx, 0, 0));
- if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
- throw new IOException("Missing file header record: " + file.getAbsoluteFile());
+ // Place magic number.
+ buffer.putLong(HeaderRecord.MAGIC);
- return ((HeaderRecord)rec).version();
+ // Place serializer version.
+ buffer.putInt(version);
+
+ // Place CRC if needed.
+ if (!RecordV1Serializer.SKIP_CRC) {
+ int curPos = buffer.position();
+
+ buffer.position(0);
+
+ // This call will move buffer position to the end of the record again.
+ int crcVal = PureJavaCrc32.calcCrc32(buffer, curPos);
+
+ buffer.putInt(crcVal);
}
- catch (SegmentEofException | EOFException ignore) {
- return serializer.version();
+ else
+ buffer.putInt(0);
+
+ // Write header record through io.
+ buffer.position(0);
+
+ do {
+ io.write(buffer);
}
+ while (buffer.hasRemaining());
+
+ // Flush
+ io.force();
+
+ return io.position();
}
/**
@@ -1715,6 +1805,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * Write serializer version to current handle.
+ * NOTE: Method mutates {@code fileIO} position, written and lastFsyncPos fields.
+ *
+ * @throws IgniteCheckedException If fail to write serializer version.
+ */
+ public void writeSerializerVersion() throws IgniteCheckedException {
+ try {
+ assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position();
+
+ long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, serializer.version());
+
+ written = updatedPosition;
+ lastFsyncPos = updatedPosition;
+ head.set(new FakeRecord(new FileWALPointer(idx, (int)updatedPosition, 0), false));
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Unable to write serializer version for segment " + idx, e);
+ }
+ }
+
+ /**
* Checks if current head is a close fake record and returns {@code true} if so.
*
* @return {@code true} if current head is close record.
@@ -2080,15 +2191,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert stopped() : "Segment is not closed after close flush: " + head.get();
try {
- int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
+ int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE + RecordV1Serializer.CRC_SIZE;
+
+ if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+ RecordV1Serializer backwardSerializer =
+ new RecordV1Serializer(new RecordDataV1Serializer(cctx), true);
- if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
- //it is expected there is sufficient space for this record because rollover should run early
- final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
- buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1));
+ final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
- final FileWALPointer pointer = new FileWALPointer(idx, (int)fileIO.position(), -1);
- RecordV1Serializer.putPosition(buf, pointer);
+ SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+ segmentRecord.position( new FileWALPointer(idx, (int)written, -1));
+ backwardSerializer.writeRecord(segmentRecord,buf);
buf.rewind();
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
new file mode 100644
index 0000000..242641d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
@@ -0,0 +1,41 @@
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+
+/**
+ * Interface to provide size, read and write operations with WAL records
+ * <b>without any headers and meta information</b>.
+ */
+public interface RecordDataSerializer {
+ /**
+ * Calculates size of record data.
+ *
+ * @param record WAL record.
+ * @return Size of record in bytes.
+ * @throws IgniteCheckedException If it's unable to calculate record data size.
+ */
+ int size(WALRecord record) throws IgniteCheckedException;
+
+ /**
+ * Reads record data of {@code type} from buffer {@code in}.
+ *
+ * @param type Record type.
+ * @param in Buffer to read.
+ * @return WAL record.
+ * @throws IOException In case of I/O problems.
+ * @throws IgniteCheckedException If it's unable to read record.
+ */
+ WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
+
+ /**
+ * Writes record data to buffer {@code buf}.
+ *
+ * @param record WAL record.
+ * @param buf Buffer to write.
+ * @throws IgniteCheckedException If it's unable to write record.
+ */
+ void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java
new file mode 100644
index 0000000..36298dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An exception is thrown when we reached tail of WAL segment cyclic buffer
+ * during reading from WAL.
+ */
+public class WalSegmentTailReachedException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public WalSegmentTailReachedException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 3a34e28..0fb8adf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -133,7 +133,7 @@ public class IgniteWalIteratorFactory {
* @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated
* according its boundaries.
*/
- public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) {
+ public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, int pageSize) {
this(log, new PersistentStoreConfiguration().getFileIOFactory(), pageSize);
}
@@ -148,8 +148,7 @@ public class IgniteWalIteratorFactory {
* @return closable WAL records iterator, should be closed when non needed
* @throws IgniteCheckedException if failed to read folder
*/
- public WALIterator iteratorArchiveDirectory(
- @NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
+ public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory, keepBinary);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index c92d572..f1258a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -100,14 +100,15 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
* (BinaryObjects will be used instead)
*/
StandaloneWalRecordsIterator(
- @NotNull final File walFilesDir,
- @NotNull final IgniteLogger log,
- @NotNull final GridCacheSharedContext sharedCtx,
- @NotNull final FileIOFactory ioFactory,
- final boolean keepBinary) throws IgniteCheckedException {
+ @NotNull File walFilesDir,
+ @NotNull IgniteLogger log,
+ @NotNull GridCacheSharedContext sharedCtx,
+ @NotNull FileIOFactory ioFactory,
+ boolean keepBinary
+ ) throws IgniteCheckedException {
super(log,
sharedCtx,
- new RecordV1Serializer(sharedCtx, true),
+ FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
ioFactory,
BUF_SIZE);
this.keepBinary = keepBinary;
@@ -127,15 +128,15 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
* @param walFiles Wal files.
*/
StandaloneWalRecordsIterator(
- @NotNull final IgniteLogger log,
- @NotNull final GridCacheSharedContext sharedCtx,
- @NotNull final FileIOFactory ioFactory,
- final boolean workDir,
- final boolean keepBinary,
- @NotNull final File... walFiles) throws IgniteCheckedException {
+ @NotNull IgniteLogger log,
+ @NotNull GridCacheSharedContext sharedCtx,
+ @NotNull FileIOFactory ioFactory,
+ boolean workDir,
+ boolean keepBinary,
+ @NotNull File... walFiles) throws IgniteCheckedException {
super(log,
sharedCtx,
- new RecordV1Serializer(sharedCtx, true),
+ FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
ioFactory,
BUF_SIZE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
new file mode 100644
index 0000000..8b5e6ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -0,0 +1,1574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixCountRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixLeftmostChildRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixRemoveId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InnerReplaceRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MergeRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageAddRootRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageCutRootRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootInlineRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastAllocatedIndex;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulFullSnapshotId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListInitNewPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListRemovePageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetNextRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.RecycleRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.RemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ReplaceRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.SplitExistingPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.SplitForwardPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.TrackingPageDeltaRecord;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
+
+/**
+ * Record data V1 serializer.
+ */
+public class RecordDataV1Serializer implements RecordDataSerializer {
+ /** Length of HEADER record data. */
+ static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
+
+ /** Cache shared context */
+ private final GridCacheSharedContext cctx;
+
+ /** Size of page used for PageMemory regions */
+ private final int pageSize;
+
+ /** Cache object processor to reading {@link DataEntry DataEntries} */
+ private final IgniteCacheObjectProcessor co;
+
+ /** Serializer of {@link TxRecord} records. */
+ private TxRecordSerializer txRecordSerializer;
+
+ /**
+ * @param cctx Cctx.
+ */
+ public RecordDataV1Serializer(GridCacheSharedContext cctx) {
+ this.cctx = cctx;
+ this.txRecordSerializer = new TxRecordSerializer(cctx);
+ this.co = cctx.kernalContext().cacheObjects();
+ this.pageSize = cctx.database().pageSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size(WALRecord record) throws IgniteCheckedException {
+ switch (record.type()) {
+ case PAGE_RECORD:
+ assert record instanceof PageSnapshot;
+
+ PageSnapshot pageRec = (PageSnapshot)record;
+
+ return pageRec.pageData().length + 12;
+
+ case CHECKPOINT_RECORD:
+ CheckpointRecord cpRec = (CheckpointRecord)record;
+
+ assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
+ "Invalid WAL record: " + cpRec;
+
+ int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates());
+
+ FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
+
+ return 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
+
+ case META_PAGE_INIT:
+ return /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8;
+
+ case PARTITION_META_PAGE_UPDATE_COUNTERS:
+ return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1
+ + /*allocatedIdxCandidate*/ 4;
+
+ case MEMORY_RECOVERY:
+ return 8;
+
+ case PARTITION_DESTROY:
+ return /*cacheId*/4 + /*partId*/4;
+
+ case DATA_RECORD:
+ DataRecord dataRec = (DataRecord)record;
+
+ return 4 + dataSize(dataRec);
+
+ case HEADER_RECORD:
+ return HEADER_RECORD_DATA_SIZE;
+
+ case DATA_PAGE_INSERT_RECORD:
+ DataPageInsertRecord diRec = (DataPageInsertRecord)record;
+
+ return 4 + 8 + 2 + diRec.payload().length;
+
+ case DATA_PAGE_UPDATE_RECORD:
+ DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
+
+ return 4 + 8 + 2 + 4 +
+ uRec.payload().length;
+
+ case DATA_PAGE_INSERT_FRAGMENT_RECORD:
+ final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
+
+ return 4 + 8 + 8 + 4 + difRec.payloadSize();
+
+ case DATA_PAGE_REMOVE_RECORD:
+ return 4 + 8 + 1;
+
+ case DATA_PAGE_SET_FREE_LIST_PAGE:
+ return 4 + 8 + 8;
+
+ case INIT_NEW_PAGE_RECORD:
+ return 4 + 8 + 2 + 2 + 8;
+
+ case BTREE_META_PAGE_INIT_ROOT:
+ return 4 + 8 + 8;
+
+ case BTREE_META_PAGE_INIT_ROOT2:
+ return 4 + 8 + 8 + 2;
+
+ case BTREE_META_PAGE_ADD_ROOT:
+ return 4 + 8 + 8;
+
+ case BTREE_META_PAGE_CUT_ROOT:
+ return 4 + 8;
+
+ case BTREE_INIT_NEW_ROOT:
+ NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
+
+ return 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize();
+
+ case BTREE_PAGE_RECYCLE:
+ return 4 + 8 + 8;
+
+ case BTREE_PAGE_INSERT:
+ InsertRecord<?> inRec = (InsertRecord<?>)record;
+
+ return 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize();
+
+ case BTREE_FIX_LEFTMOST_CHILD:
+ return 4 + 8 + 8;
+
+ case BTREE_FIX_COUNT:
+ return 4 + 8 + 2;
+
+ case BTREE_PAGE_REPLACE:
+ ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
+
+ return 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize();
+
+ case BTREE_PAGE_REMOVE:
+ return 4 + 8 + 2 + 2;
+
+ case BTREE_PAGE_INNER_REPLACE:
+ return 4 + 8 + 2 + 8 + 2 + 8;
+
+ case BTREE_FORWARD_PAGE_SPLIT:
+ return 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2;
+
+ case BTREE_EXISTING_PAGE_SPLIT:
+ return 4 + 8 + 2 + 8;
+
+ case BTREE_PAGE_MERGE:
+ return 4 + 8 + 8 + 2 + 8 + 1;
+
+ case BTREE_FIX_REMOVE_ID:
+ return 4 + 8 + 8;
+
+ case PAGES_LIST_SET_NEXT:
+ return 4 + 8 + 8;
+
+ case PAGES_LIST_SET_PREVIOUS:
+ return 4 + 8 + 8;
+
+ case PAGES_LIST_INIT_NEW_PAGE:
+ return 4 + 8 + 4 + 4 + 8 + 8 + 8;
+
+ case PAGES_LIST_ADD_PAGE:
+ return 4 + 8 + 8;
+
+ case PAGES_LIST_REMOVE_PAGE:
+ return 4 + 8 + 8;
+
+ case TRACKING_PAGE_DELTA:
+ return 4 + 8 + 8 + 8 + 8;
+
+ case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
+ return 4 + 8 + 8 + 8;
+
+ case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+ return 4 + 8 + 8;
+
+ case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
+ return 4 + 8 + 8;
+
+ case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
+ return 4 + 8 + 4;
+
+ case PART_META_UPDATE_STATE:
+ return /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8;
+
+ case PAGE_LIST_META_RESET_COUNT_RECORD:
+ return /*cacheId*/ 4 + /*pageId*/ 8;
+
+ case SWITCH_SEGMENT_RECORD:
+ // CRC is not loaded for switch segment.
+ return -CRC_SIZE;
+
+ case TX_RECORD:
+ return txRecordSerializer.sizeOfTxRecord((TxRecord)record);
+
+ default:
+ throw new UnsupportedOperationException("Type: " + record.type());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+ WALRecord res;
+
+ switch (type) {
+ case PAGE_RECORD:
+ byte[] arr = new byte[pageSize];
+
+ int cacheId = in.readInt();
+ long pageId = in.readLong();
+
+ in.readFully(arr);
+
+ res = new PageSnapshot(new FullPageId(pageId, cacheId), arr);
+
+ break;
+
+ case CHECKPOINT_RECORD:
+ long msb = in.readLong();
+ long lsb = in.readLong();
+ boolean hasPtr = in.readByte() != 0;
+ int idx = hasPtr ? in.readInt() : 0;
+ int offset = hasPtr ? in.readInt() : 0;
+ int len = hasPtr ? in.readInt() : 0;
+
+ Map<Integer, CacheState> states = readPartitionStates(in);
+
+ boolean end = in.readByte() != 0;
+
+ FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null;
+
+ CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end);
+
+ cpRec.cacheGroupStates(states);
+
+ res = cpRec;
+
+ break;
+
+ case META_PAGE_INIT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int ioType = in.readUnsignedShort();
+ int ioVer = in.readUnsignedShort();
+ long treeRoot = in.readLong();
+ long reuseListRoot = in.readLong();
+
+ res = new MetaPageInitRecord(cacheId, pageId, ioType, ioVer, treeRoot, reuseListRoot);
+
+ break;
+
+ case PARTITION_META_PAGE_UPDATE_COUNTERS:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long updCntr = in.readLong();
+ long rmvId = in.readLong();
+ int partSize = in.readInt();
+ long countersPageId = in.readLong();
+ byte state = in.readByte();
+ int allocatedIdxCandidate = in.readInt();
+
+ res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, countersPageId, state, allocatedIdxCandidate);
+
+ break;
+
+ case MEMORY_RECOVERY:
+ long ts = in.readLong();
+
+ res = new MemoryRecoveryRecord(ts);
+
+ break;
+
+ case PARTITION_DESTROY:
+ cacheId = in.readInt();
+ int partId = in.readInt();
+
+ res = new PartitionDestroyRecord(cacheId, partId);
+
+ break;
+
+ case DATA_RECORD:
+ int entryCnt = in.readInt();
+
+ List<DataEntry> entries = new ArrayList<>(entryCnt);
+
+ for (int i = 0; i < entryCnt; i++)
+ entries.add(readDataEntry(in));
+
+ res = new DataRecord(entries);
+
+ break;
+
+ case HEADER_RECORD:
+ long magic = in.readLong();
+
+ if (magic != HeaderRecord.MAGIC)
+ throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
+ ", actual=" + U.hexLong(magic) + ']');
+
+ int ver = in.readInt();
+
+ res = new HeaderRecord(ver);
+
+ break;
+
+ case DATA_PAGE_INSERT_RECORD: {
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int size = in.readUnsignedShort();
+
+ in.ensure(size);
+
+ byte[] payload = new byte[size];
+
+ in.readFully(payload);
+
+ res = new DataPageInsertRecord(cacheId, pageId, payload);
+
+ break;
+ }
+
+ case DATA_PAGE_UPDATE_RECORD: {
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int itemId = in.readInt();
+
+ int size = in.readUnsignedShort();
+
+ in.ensure(size);
+
+ byte[] payload = new byte[size];
+
+ in.readFully(payload);
+
+ res = new DataPageUpdateRecord(cacheId, pageId, itemId, payload);
+
+ break;
+ }
+
+ case DATA_PAGE_INSERT_FRAGMENT_RECORD: {
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ final long lastLink = in.readLong();
+ final int payloadSize = in.readInt();
+
+ final byte[] payload = new byte[payloadSize];
+
+ in.readFully(payload);
+
+ res = new DataPageInsertFragmentRecord(cacheId, pageId, payload, lastLink);
+
+ break;
+ }
+
+ case DATA_PAGE_REMOVE_RECORD:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int itemId = in.readUnsignedByte();
+
+ res = new DataPageRemoveRecord(cacheId, pageId, itemId);
+
+ break;
+
+ case DATA_PAGE_SET_FREE_LIST_PAGE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long freeListPage = in.readLong();
+
+ res = new DataPageSetFreeListPageRecord(cacheId, pageId, freeListPage);
+
+ break;
+
+ case INIT_NEW_PAGE_RECORD:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ ioType = in.readUnsignedShort();
+ ioVer = in.readUnsignedShort();
+ long virtualPageId = in.readLong();
+
+ res = new InitNewPageRecord(cacheId, pageId, ioType, ioVer, virtualPageId);
+
+ break;
+
+ case BTREE_META_PAGE_INIT_ROOT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long rootId = in.readLong();
+
+ res = new MetaPageInitRootRecord(cacheId, pageId, rootId);
+
+ break;
+
+ case BTREE_META_PAGE_INIT_ROOT2:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long rootId2 = in.readLong();
+ int inlineSize = in.readShort();
+
+ res = new MetaPageInitRootInlineRecord(cacheId, pageId, rootId2, inlineSize);
+
+ break;
+
+ case BTREE_META_PAGE_ADD_ROOT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ rootId = in.readLong();
+
+ res = new MetaPageAddRootRecord(cacheId, pageId, rootId);
+
+ break;
+
+ case BTREE_META_PAGE_CUT_ROOT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ res = new MetaPageCutRootRecord(cacheId, pageId);
+
+ break;
+
+ case BTREE_INIT_NEW_ROOT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ rootId = in.readLong();
+ ioType = in.readUnsignedShort();
+ ioVer = in.readUnsignedShort();
+ long leftId = in.readLong();
+ long rightId = in.readLong();
+
+ BPlusIO<?> io = BPlusIO.getBPlusIO(ioType, ioVer);
+
+ byte[] rowBytes = new byte[io.getItemSize()];
+
+ in.readFully(rowBytes);
+
+ res = new NewRootInitRecord<>(cacheId, pageId, rootId, (BPlusInnerIO<?>)io, leftId, rowBytes, rightId);
+
+ break;
+
+ case BTREE_PAGE_RECYCLE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long newPageId = in.readLong();
+
+ res = new RecycleRecord(cacheId, pageId, newPageId);
+
+ break;
+
+ case BTREE_PAGE_INSERT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ ioType = in.readUnsignedShort();
+ ioVer = in.readUnsignedShort();
+ int itemIdx = in.readUnsignedShort();
+ rightId = in.readLong();
+
+ io = BPlusIO.getBPlusIO(ioType, ioVer);
+
+ rowBytes = new byte[io.getItemSize()];
+
+ in.readFully(rowBytes);
+
+ res = new InsertRecord<>(cacheId, pageId, io, itemIdx, rowBytes, rightId);
+
+ break;
+
+ case BTREE_FIX_LEFTMOST_CHILD:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ rightId = in.readLong();
+
+ res = new FixLeftmostChildRecord(cacheId, pageId, rightId);
+
+ break;
+
+ case BTREE_FIX_COUNT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int cnt = in.readUnsignedShort();
+
+ res = new FixCountRecord(cacheId, pageId, cnt);
+
+ break;
+
+ case BTREE_PAGE_REPLACE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ ioType = in.readUnsignedShort();
+ ioVer = in.readUnsignedShort();
+ itemIdx = in.readUnsignedShort();
+
+ io = BPlusIO.getBPlusIO(ioType, ioVer);
+
+ rowBytes = new byte[io.getItemSize()];
+
+ in.readFully(rowBytes);
+
+ res = new ReplaceRecord<>(cacheId, pageId, io, rowBytes, itemIdx);
+
+ break;
+
+ case BTREE_PAGE_REMOVE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ itemIdx = in.readUnsignedShort();
+ cnt = in.readUnsignedShort();
+
+ res = new RemoveRecord(cacheId, pageId, itemIdx, cnt);
+
+ break;
+
+ case BTREE_PAGE_INNER_REPLACE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int dstIdx = in.readUnsignedShort();
+ long srcPageId = in.readLong();
+ int srcIdx = in.readUnsignedShort();
+ rmvId = in.readLong();
+
+ res = new InnerReplaceRecord<>(cacheId, pageId, dstIdx, srcPageId, srcIdx, rmvId);
+
+ break;
+
+ case BTREE_FORWARD_PAGE_SPLIT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long fwdId = in.readLong();
+ ioType = in.readUnsignedShort();
+ ioVer = in.readUnsignedShort();
+ srcPageId = in.readLong();
+ int mid = in.readUnsignedShort();
+ cnt = in.readUnsignedShort();
+
+ res = new SplitForwardPageRecord(cacheId, pageId, fwdId, ioType, ioVer, srcPageId, mid, cnt);
+
+ break;
+
+ case BTREE_EXISTING_PAGE_SPLIT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ mid = in.readUnsignedShort();
+ fwdId = in.readLong();
+
+ res = new SplitExistingPageRecord(cacheId, pageId, mid, fwdId);
+
+ break;
+
+ case BTREE_PAGE_MERGE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long prntId = in.readLong();
+ int prntIdx = in.readUnsignedShort();
+ rightId = in.readLong();
+ boolean emptyBranch = in.readBoolean();
+
+ res = new MergeRecord<>(cacheId, pageId, prntId, prntIdx, rightId, emptyBranch);
+
+ break;
+
+ case BTREE_FIX_REMOVE_ID:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ rmvId = in.readLong();
+
+ res = new FixRemoveId(cacheId, pageId, rmvId);
+
+ break;
+
+ case PAGES_LIST_SET_NEXT:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+ long nextPageId = in.readLong();
+
+ res = new PagesListSetNextRecord(cacheId, pageId, nextPageId);
+
+ break;
+
+ case PAGES_LIST_SET_PREVIOUS:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+ long prevPageId = in.readLong();
+
+ res = new PagesListSetPreviousRecord(cacheId, pageId, prevPageId);
+
+ break;
+
+ case PAGES_LIST_INIT_NEW_PAGE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+ ioType = in.readInt();
+ ioVer = in.readInt();
+ newPageId = in.readLong();
+ prevPageId = in.readLong();
+ long addDataPageId = in.readLong();
+
+ res = new PagesListInitNewPageRecord(cacheId, pageId, ioType, ioVer, newPageId, prevPageId, addDataPageId);
+
+ break;
+
+ case PAGES_LIST_ADD_PAGE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+ long dataPageId = in.readLong();
+
+ res = new PagesListAddPageRecord(cacheId, pageId, dataPageId);
+
+ break;
+
+ case PAGES_LIST_REMOVE_PAGE:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+ long rmvdPageId = in.readLong();
+
+ res = new PagesListRemovePageRecord(cacheId, pageId, rmvdPageId);
+
+ break;
+
+ case TRACKING_PAGE_DELTA:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long pageIdToMark = in.readLong();
+ long nextSnapshotId0 = in.readLong();
+ long lastSuccessfulSnapshotId0 = in.readLong();
+
+ res = new TrackingPageDeltaRecord(cacheId, pageId, pageIdToMark, nextSnapshotId0, lastSuccessfulSnapshotId0);
+
+ break;
+
+ case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long nextSnapshotId = in.readLong();
+
+ res = new MetaPageUpdateNextSnapshotId(cacheId, pageId, nextSnapshotId);
+
+ break;
+
+ case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long lastSuccessfulFullSnapshotId = in.readLong();
+
+ res = new MetaPageUpdateLastSuccessfulFullSnapshotId(cacheId, pageId, lastSuccessfulFullSnapshotId);
+
+ break;
+
+ case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ long lastSuccessfulSnapshotId = in.readLong();
+ long lastSuccessfulSnapshotTag = in.readLong();
+
+ res = new MetaPageUpdateLastSuccessfulSnapshotId(cacheId, pageId, lastSuccessfulSnapshotId, lastSuccessfulSnapshotTag);
+
+ break;
+
+ case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ int lastAllocatedIdx = in.readInt();
+
+ res = new MetaPageUpdateLastAllocatedIndex(cacheId, pageId, lastAllocatedIdx);
+
+ break;
+
+ case PART_META_UPDATE_STATE:
+ cacheId = in.readInt();
+ partId = in.readInt();
+
+ state = in.readByte();
+
+ long updateCounter = in.readLong();
+
+ res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter);
+
+ break;
+
+ case PAGE_LIST_META_RESET_COUNT_RECORD:
+ cacheId = in.readInt();
+ pageId = in.readLong();
+
+ res = new PageListMetaResetCountRecord(cacheId, pageId);
+ break;
+
+ case SWITCH_SEGMENT_RECORD:
+ throw new EOFException("END OF SEGMENT");
+
+ case TX_RECORD:
+ res = txRecordSerializer.readTxRecord(in);
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Type: " + type);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+ switch (record.type()) {
+ case PAGE_RECORD:
+ PageSnapshot snap = (PageSnapshot)record;
+
+ buf.putInt(snap.fullPageId().groupId());
+ buf.putLong(snap.fullPageId().pageId());
+ buf.put(snap.pageData());
+
+ break;
+
+ case MEMORY_RECOVERY:
+ MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
+
+ buf.putLong(memoryRecoveryRecord.time());
+
+ break;
+
+ case PARTITION_DESTROY:
+ PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record;
+
+ buf.putInt(partDestroy.groupId());
+ buf.putInt(partDestroy.partitionId());
+
+ break;
+
+ case META_PAGE_INIT:
+ MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record;
+
+ buf.putInt(updRootsRec.groupId());
+ buf.putLong(updRootsRec.pageId());
+
+ buf.putShort((short)updRootsRec.ioType());
+ buf.putShort((short)updRootsRec.ioVersion());
+ buf.putLong(updRootsRec.treeRoot());
+ buf.putLong(updRootsRec.reuseListRoot());
+
+ break;
+
+ case PARTITION_META_PAGE_UPDATE_COUNTERS:
+ MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record;
+
+ buf.putInt(partDataRec.groupId());
+ buf.putLong(partDataRec.pageId());
+
+ buf.putLong(partDataRec.updateCounter());
+ buf.putLong(partDataRec.globalRemoveId());
+ buf.putInt(partDataRec.partitionSize());
+ buf.putLong(partDataRec.countersPageId());
+ buf.put(partDataRec.state());
+ buf.putInt(partDataRec.allocatedIndexCandidate());
+
+ break;
+
+ case CHECKPOINT_RECORD:
+ CheckpointRecord cpRec = (CheckpointRecord)record;
+
+ assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
+ "Invalid WAL record: " + cpRec;
+
+ FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
+ UUID cpId = cpRec.checkpointId();
+
+ buf.putLong(cpId.getMostSignificantBits());
+ buf.putLong(cpId.getLeastSignificantBits());
+
+ buf.put(walPtr == null ? (byte)0 : 1);
+
+ if (walPtr != null) {
+ buf.putLong(walPtr.index());
+ buf.putInt(walPtr.fileOffset());
+ buf.putInt(walPtr.length());
+ }
+
+ putCacheStates(buf, cpRec.cacheGroupStates());
+
+ buf.put(cpRec.end() ? (byte)1 : 0);
+
+ break;
+
+ case DATA_RECORD:
+ DataRecord dataRec = (DataRecord)record;
+
+ buf.putInt(dataRec.writeEntries().size());
+
+ for (DataEntry dataEntry : dataRec.writeEntries())
+ putDataEntry(buf, dataEntry);
+
+ break;
+
+ case HEADER_RECORD:
+ buf.putLong(HeaderRecord.MAGIC);
+
+ buf.putInt(((HeaderRecord)record).version());
+
+ break;
+
+ case DATA_PAGE_INSERT_RECORD:
+ DataPageInsertRecord diRec = (DataPageInsertRecord)record;
+
+ buf.putInt(diRec.groupId());
+ buf.putLong(diRec.pageId());
+
+ buf.putShort((short)diRec.payload().length);
+
+ buf.put(diRec.payload());
+
+ break;
+
+ case DATA_PAGE_UPDATE_RECORD:
+ DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
+
+ buf.putInt(uRec.groupId());
+ buf.putLong(uRec.pageId());
+ buf.putInt(uRec.itemId());
+
+ buf.putShort((short)uRec.payload().length);
+
+ buf.put(uRec.payload());
+
+ break;
+
+ case DATA_PAGE_INSERT_FRAGMENT_RECORD:
+ final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
+
+ buf.putInt(difRec.groupId());
+ buf.putLong(difRec.pageId());
+
+ buf.putLong(difRec.lastLink());
+ buf.putInt(difRec.payloadSize());
+ buf.put(difRec.payload());
+
+ break;
+
+ case DATA_PAGE_REMOVE_RECORD:
+ DataPageRemoveRecord drRec = (DataPageRemoveRecord)record;
+
+ buf.putInt(drRec.groupId());
+ buf.putLong(drRec.pageId());
+
+ buf.put((byte)drRec.itemId());
+
+ break;
+
+ case DATA_PAGE_SET_FREE_LIST_PAGE:
+ DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record;
+
+ buf.putInt(freeListRec.groupId());
+ buf.putLong(freeListRec.pageId());
+
+ buf.putLong(freeListRec.freeListPage());
+
+ break;
+
+ case INIT_NEW_PAGE_RECORD:
+ InitNewPageRecord inpRec = (InitNewPageRecord)record;
+
+ buf.putInt(inpRec.groupId());
+ buf.putLong(inpRec.pageId());
+
+ buf.putShort((short)inpRec.ioType());
+ buf.putShort((short)inpRec.ioVersion());
+ buf.putLong(inpRec.newPageId());
+
+ break;
+
+ case BTREE_META_PAGE_INIT_ROOT:
+ MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record;
+
+ buf.putInt(imRec.groupId());
+ buf.putLong(imRec.pageId());
+
+ buf.putLong(imRec.rootId());
+
+ break;
+
+ case BTREE_META_PAGE_INIT_ROOT2:
+ MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record;
+
+ buf.putInt(imRec2.groupId());
+ buf.putLong(imRec2.pageId());
+
+ buf.putLong(imRec2.rootId());
+
+ buf.putShort((short)imRec2.inlineSize());
+ break;
+
+ case BTREE_META_PAGE_ADD_ROOT:
+ MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record;
+
+ buf.putInt(arRec.groupId());
+ buf.putLong(arRec.pageId());
+
+ buf.putLong(arRec.rootId());
+
+ break;
+
+ case BTREE_META_PAGE_CUT_ROOT:
+ MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record;
+
+ buf.putInt(crRec.groupId());
+ buf.putLong(crRec.pageId());
+
+ break;
+
+ case BTREE_INIT_NEW_ROOT:
+ NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
+
+ buf.putInt(riRec.groupId());
+ buf.putLong(riRec.pageId());
+
+ buf.putLong(riRec.rootId());
+ buf.putShort((short)riRec.io().getType());
+ buf.putShort((short)riRec.io().getVersion());
+ buf.putLong(riRec.leftId());
+ buf.putLong(riRec.rightId());
+
+ putRow(buf, riRec.rowBytes());
+
+ break;
+
+ case BTREE_PAGE_RECYCLE:
+ RecycleRecord recRec = (RecycleRecord)record;
+
+ buf.putInt(recRec.groupId());
+ buf.putLong(recRec.pageId());
+
+ buf.putLong(recRec.newPageId());
+
+ break;
+
+ case BTREE_PAGE_INSERT:
+ InsertRecord<?> inRec = (InsertRecord<?>)record;
+
+ buf.putInt(inRec.groupId());
+ buf.putLong(inRec.pageId());
+
+ buf.putShort((short)inRec.io().getType());
+ buf.putShort((short)inRec.io().getVersion());
+ buf.putShort((short)inRec.index());
+ buf.putLong(inRec.rightId());
+
+ putRow(buf, inRec.rowBytes());
+
+ break;
+
+ case BTREE_FIX_LEFTMOST_CHILD:
+ FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record;
+
+ buf.putInt(flRec.groupId());
+ buf.putLong(flRec.pageId());
+
+ buf.putLong(flRec.rightId());
+
+ break;
+
+ case BTREE_FIX_COUNT:
+ FixCountRecord fcRec = (FixCountRecord)record;
+
+ buf.putInt(fcRec.groupId());
+ buf.putLong(fcRec.pageId());
+
+ buf.putShort((short)fcRec.count());
+
+ break;
+
+ case BTREE_PAGE_REPLACE:
+ ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
+
+ buf.putInt(rRec.groupId());
+ buf.putLong(rRec.pageId());
+
+ buf.putShort((short)rRec.io().getType());
+ buf.putShort((short)rRec.io().getVersion());
+ buf.putShort((short)rRec.index());
+
+ putRow(buf, rRec.rowBytes());
+
+ break;
+
+ case BTREE_PAGE_REMOVE:
+ RemoveRecord rmRec = (RemoveRecord)record;
+
+ buf.putInt(rmRec.groupId());
+ buf.putLong(rmRec.pageId());
+
+ buf.putShort((short)rmRec.index());
+ buf.putShort((short)rmRec.count());
+
+ break;
+
+ case BTREE_PAGE_INNER_REPLACE:
+ InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)record;
+
+ buf.putInt(irRec.groupId());
+ buf.putLong(irRec.pageId());
+
+ buf.putShort((short)irRec.destinationIndex());
+ buf.putLong(irRec.sourcePageId());
+ buf.putShort((short)irRec.sourceIndex());
+ buf.putLong(irRec.removeId());
+
+ break;
+
+ case BTREE_FORWARD_PAGE_SPLIT:
+ SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record;
+
+ buf.putInt(sfRec.groupId());
+ buf.putLong(sfRec.pageId());
+
+ buf.putLong(sfRec.forwardId());
+ buf.putShort((short)sfRec.ioType());
+ buf.putShort((short)sfRec.ioVersion());
+ buf.putLong(sfRec.sourcePageId());
+ buf.putShort((short)sfRec.middleIndex());
+ buf.putShort((short)sfRec.count());
+
+ break;
+
+ case BTREE_EXISTING_PAGE_SPLIT:
+ SplitExistingPageRecord seRec = (SplitExistingPageRecord)record;
+
+ buf.putInt(seRec.groupId());
+ buf.putLong(seRec.pageId());
+
+ buf.putShort((short)seRec.middleIndex());
+ buf.putLong(seRec.forwardId());
+
+ break;
+
+ case BTREE_PAGE_MERGE:
+ MergeRecord<?> mRec = (MergeRecord<?>)record;
+
+ buf.putInt(mRec.groupId());
+ buf.putLong(mRec.pageId());
+
+ buf.putLong(mRec.parentId());
+ buf.putShort((short)mRec.parentIndex());
+ buf.putLong(mRec.rightId());
+ buf.put((byte)(mRec.isEmptyBranch() ? 1 : 0));
+
+ break;
+
+ case PAGES_LIST_SET_NEXT:
+ PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record;
+
+ buf.putInt(plNextRec.groupId());
+ buf.putLong(plNextRec.pageId());
+
+ buf.putLong(plNextRec.nextPageId());
+
+ break;
+
+ case PAGES_LIST_SET_PREVIOUS:
+ PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record;
+
+ buf.putInt(plPrevRec.groupId());
+ buf.putLong(plPrevRec.pageId());
+
+ buf.putLong(plPrevRec.previousPageId());
+
+ break;
+
+ case PAGES_LIST_INIT_NEW_PAGE:
+ PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record;
+
+ buf.putInt(plNewRec.groupId());
+ buf.putLong(plNewRec.pageId());
+ buf.putInt(plNewRec.ioType());
+ buf.putInt(plNewRec.ioVersion());
+ buf.putLong(plNewRec.newPageId());
+
+ buf.putLong(plNewRec.previousPageId());
+ buf.putLong(plNewRec.dataPageId());
+
+ break;
+
+ case PAGES_LIST_ADD_PAGE:
+ PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record;
+
+ buf.putInt(plAddRec.groupId());
+ buf.putLong(plAddRec.pageId());
+
+ buf.putLong(plAddRec.dataPageId());
+
+ break;
+
+ case PAGES_LIST_REMOVE_PAGE:
+ PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record;
+
+ buf.putInt(plRmvRec.groupId());
+ buf.putLong(plRmvRec.pageId());
+
+ buf.putLong(plRmvRec.removedPageId());
+
+ break;
+
+ case BTREE_FIX_REMOVE_ID:
+ FixRemoveId frRec = (FixRemoveId)record;
+
+ buf.putInt(frRec.groupId());
+ buf.putLong(frRec.pageId());
+
+ buf.putLong(frRec.removeId());
+
+ break;
+
+ case TRACKING_PAGE_DELTA:
+ TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record;
+
+ buf.putInt(tpDelta.groupId());
+ buf.putLong(tpDelta.pageId());
+
+ buf.putLong(tpDelta.pageIdToMark());
+ buf.putLong(tpDelta.nextSnapshotId());
+ buf.putLong(tpDelta.lastSuccessfulSnapshotId());
+
+ break;
+
+ case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
+ MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record;
+
+ buf.putInt(mpUpdateNextSnapshotId.groupId());
+ buf.putLong(mpUpdateNextSnapshotId.pageId());
+
+ buf.putLong(mpUpdateNextSnapshotId.nextSnapshotId());
+
+ break;
+
+ case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+ MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId =
+ (MetaPageUpdateLastSuccessfulFullSnapshotId)record;
+
+ buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId());
+ buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId());
+
+ buf.putLong(mpUpdateLastSuccFullSnapshotId.lastSuccessfulFullSnapshotId());
+
+ break;
+
+ case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
+ MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId =
+ (MetaPageUpdateLastSuccessfulSnapshotId)record;
+
+ buf.putInt(mpUpdateLastSuccSnapshotId.groupId());
+ buf.putLong(mpUpdateLastSuccSnapshotId.pageId());
+
+ buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotId());
+ buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotTag());
+
+ break;
+
+ case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
+ MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx =
+ (MetaPageUpdateLastAllocatedIndex) record;
+
+ buf.putInt(mpUpdateLastAllocatedIdx.groupId());
+ buf.putLong(mpUpdateLastAllocatedIdx.pageId());
+
+ buf.putInt(mpUpdateLastAllocatedIdx.lastAllocatedIndex());
+
+ break;
+
+ case PART_META_UPDATE_STATE:
+ PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record;
+
+ buf.putInt(partMetaStateRecord.groupId());
+
+ buf.putInt(partMetaStateRecord.partitionId());
+
+ buf.put(partMetaStateRecord.state());
+
+ buf.putLong(partMetaStateRecord.updateCounter());
+
+ break;
+
+ case PAGE_LIST_META_RESET_COUNT_RECORD:
+ PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record;
+
+ buf.putInt(pageListMetaResetCntRecord.groupId());
+ buf.putLong(pageListMetaResetCntRecord.pageId());
+
+ break;
+
+ case TX_RECORD:
+ txRecordSerializer.writeTxRecord((TxRecord)record, buf);
+
+ break;
+
+ case SWITCH_SEGMENT_RECORD:
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Type: " + record.type());
+ }
+ }
+
+ /**
+ * @param buf Buffer to write to.
+ * @param entry Data entry.
+ */
+ private static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException {
+ buf.putInt(entry.cacheId());
+
+ if (!entry.key().putValue(buf))
+ throw new AssertionError();
+
+ if (entry.value() == null)
+ buf.putInt(-1);
+ else if (!entry.value().putValue(buf))
+ throw new AssertionError();
+
+ buf.put((byte)entry.op().ordinal());
+
+ putVersion(buf, entry.nearXidVersion(), true);
+ putVersion(buf, entry.writeVersion(), false);
+
+ buf.putInt(entry.partitionId());
+ buf.putLong(entry.partitionCounter());
+ buf.putLong(entry.expireTime());
+ }
+
+ /**
+ * @param states Cache states.
+ */
+ private static void putCacheStates(ByteBuffer buf, Map<Integer, CacheState> states) {
+ buf.putShort((short)states.size());
+
+ for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
+ buf.putInt(entry.getKey());
+
+ CacheState state = entry.getValue();
+
+ // Need 2 bytes for the number of partitions.
+ buf.putShort((short)state.size());
+
+ for (int i = 0; i < state.size(); i++) {
+ buf.putShort((short)state.partitionByIndex(i));
+
+ buf.putLong(state.partitionSizeByIndex(i));
+ buf.putLong(state.partitionCounterByIndex(i));
+ }
+ }
+ }
+
+ /**
+ * @param buf Buffer.
+ * @param ver Version to write.
+ * @param allowNull Is {@code null}version allowed.
+ */
+ private static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
+ CacheVersionIO.write(buf, ver, allowNull);
+ }
+
+ /**
+ * @param buf Buffer.
+ * @param rowBytes Row bytes.
+ */
+ @SuppressWarnings("unchecked")
+ private static void putRow(ByteBuffer buf, byte[] rowBytes) {
+ assert rowBytes.length > 0;
+
+ buf.put(rowBytes);
+ }
+
+ /**
+ * @param in Input to read from.
+ * @return Read entry.
+ */
+ private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+ int cacheId = in.readInt();
+
+ int keySize = in.readInt();
+ byte keyType = in.readByte();
+ byte[] keyBytes = new byte[keySize];
+ in.readFully(keyBytes);
+
+ int valSize = in.readInt();
+
+ byte valType = 0;
+ byte[] valBytes = null;
+
+ if (valSize >= 0) {
+ valType = in.readByte();
+ valBytes = new byte[valSize];
+ in.readFully(valBytes);
+ }
+
+ byte ord = in.readByte();
+
+ GridCacheOperation op = GridCacheOperation.fromOrdinal(ord & 0xFF);
+
+ GridCacheVersion nearXidVer = readVersion(in, true);
+ GridCacheVersion writeVer = readVersion(in, false);
+
+ int partId = in.readInt();
+ long partCntr = in.readLong();
+ long expireTime = in.readLong();
+
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+ if (cacheCtx != null) {
+ CacheObjectContext coCtx = cacheCtx.cacheObjectContext();
+
+ KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes);
+ CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null;
+
+ return new DataEntry(
+ cacheId,
+ key,
+ val,
+ op,
+ nearXidVer,
+ writeVer,
+ expireTime,
+ partId,
+ partCntr
+ );
+ }
+ else
+ return new LazyDataEntry(
+ cctx,
+ cacheId,
+ keyType,
+ keyBytes,
+ valType,
+ valBytes,
+ op,
+ nearXidVer,
+ writeVer,
+ expireTime,
+ partId,
+ partCntr);
+ }
+
+ /**
+ * @param buf Buffer to read from.
+ * @return Read map.
+ */
+ private Map<Integer, CacheState> readPartitionStates(DataInput buf) throws IOException {
+ int caches = buf.readShort() & 0xFFFF;
+
+ if (caches == 0)
+ return Collections.emptyMap();
+
+ Map<Integer, CacheState> states = new HashMap<>(caches, 1.0f);
+
+ for (int i = 0; i < caches; i++) {
+ int cacheId = buf.readInt();
+
+ int parts = buf.readShort() & 0xFFFF;
+
+ CacheState state = new CacheState(parts);
+
+ for (int p = 0; p < parts; p++) {
+ int partId = buf.readShort() & 0xFFFF;
+ long size = buf.readLong();
+ long partCntr = buf.readLong();
+
+ state.addPartitionState(partId, size, partCntr);
+ }
+
+ states.put(cacheId, state);
+ }
+
+ return states;
+ }
+
+ /**
+ * Changes the buffer position by the number of read bytes.
+ *
+ * @param in Data input to read from.
+ * @param allowNull Is {@code null}version allowed.
+ * @return Read cache version.
+ */
+ private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
+ // To be able to read serialization protocol version.
+ in.ensure(1);
+
+ try {
+ int size = CacheVersionIO.readSize(in.buffer(), allowNull);
+
+ in.ensure(size);
+
+ return CacheVersionIO.read(in.buffer(), allowNull);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * @param dataRec Data record to serialize.
+ * @return Full data record size.
+ * @throws IgniteCheckedException If failed to obtain the length of one of the entries.
+ */
+ private int dataSize(DataRecord dataRec) throws IgniteCheckedException {
+ int sz = 0;
+
+ for (DataEntry entry : dataRec.writeEntries())
+ sz += entrySize(entry);
+
+ return sz;
+ }
+
+ /**
+ * @param entry Entry to get size for.
+ * @return Entry size.
+ * @throws IgniteCheckedException If failed to get key or value bytes length.
+ */
+ private int entrySize(DataEntry entry) throws IgniteCheckedException {
+ GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId());
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+ return
+ /*cache ID*/4 +
+ /*key*/entry.key().valueBytesLength(coCtx) +
+ /*value*/(entry.value() == null ? 4 : entry.value().valueBytesLength(coCtx)) +
+ /*op*/1 +
+ /*near xid ver*/CacheVersionIO.size(entry.nearXidVersion(), true) +
+ /*write ver*/CacheVersionIO.size(entry.writeVersion(), false) +
+ /*part ID*/4 +
+ /*expire Time*/8 +
+ /*part cnt*/8;
+ }
+
+ /**
+ * @param states Partition states.
+ * @return Size required to write partition states.
+ */
+ private int cacheStatesSize(Map<Integer, CacheState> states) {
+ // Need 4 bytes for the number of caches.
+ int size = 2;
+
+ for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
+ // Cache ID.
+ size += 4;
+
+ // Need 2 bytes for the number of partitions.
+ size += 2;
+
+ CacheState state = entry.getValue();
+
+ // 2 bytes partition ID, size and counter per partition.
+ size += 18 * state.size();
+ }
+
+ return size;
+ }
+
+}