You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/26 07:33:39 UTC
[06/12] ignite git commit: ignite-6339 WAL write operations are
optimized and file IO operations are non-interruptible from user thread now
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
new file mode 100644
index 0000000..a0209f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import sun.nio.ch.DirectBuffer;
+
+import static java.nio.ByteBuffer.allocate;
+import static java.nio.ByteBuffer.allocateDirect;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.MAPPED;
+
+/**
+ * Segmented ring byte buffer that represents multi producer/single consumer queue that can be used by multiple writer
+ * threads and one reader thread.
+ */
+public class SegmentedRingByteBuffer {
+ /** Open mask. */
+ private static final long OPEN_MASK = 0x7FFFFFFFFFFFFFFFL;
+
+ /** Close mask. */
+ private static final long CLOSE_MASK = 0x8000000000000000L;
+
+ /** Tail field atomic updater. */
+ private static final AtomicLongFieldUpdater<SegmentedRingByteBuffer> TAIL_UPD =
+ AtomicLongFieldUpdater.newUpdater(SegmentedRingByteBuffer.class, "tail");
+
+ /** Producers count field atomic updater. */
+ private static final AtomicIntegerFieldUpdater<SegmentedRingByteBuffer> PRODUCERS_CNT_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(SegmentedRingByteBuffer.class, "producersCnt");
+
+ /** Capacity. */
+ private final int cap;
+
+ /** Direct. */
+ private final BufferMode mode;
+
+ /** Buffer. */
+ public final ByteBuffer buf;
+
+ /** Max segment size. */
+ private final long maxSegmentSize;
+
+ /** Head. */
+ private volatile long head;
+
+ /** Tail. */
+ private volatile long tail;
+
+ /**
+ * Producers count. Uses by consumer in order to wait for ending of data writing by all producers.
+ */
+ private volatile int producersCnt;
+
+ /**
+ * Wait for consumer flag. Prevents producers from writing data to the ring buffer while consumer waiting for finish
+ * of all already writing producers.
+ */
+ private volatile boolean waitForConsumer;
+
+ /** Metrics. */
+ private final DataStorageMetricsImpl metrics;
+
+ /**
+ * Creates ring buffer with given capacity.
+ *
+ * @param cap Buffer's capacity.
+ * @param maxSegmentSize Max segment size.
+ * @param mode Buffer mode.
+ */
+ public SegmentedRingByteBuffer(int cap, long maxSegmentSize, BufferMode mode) {
+ this(cap, maxSegmentSize, mode == DIRECT ? allocateDirect(cap) : allocate(cap), mode, null);
+ }
+
+ /**
+ * Creates ring buffer with given capacity.
+ *
+ * @param cap Buffer's capacity.
+ * @param maxSegmentSize Max segment size.
+ * @param mode Buffer mode.
+ * @param metrics Metrics.
+ */
+ public SegmentedRingByteBuffer(int cap, long maxSegmentSize, BufferMode mode, DataStorageMetricsImpl metrics) {
+ this(cap, maxSegmentSize, mode == DIRECT ? allocateDirect(cap) : allocate(cap), mode, metrics);
+ }
+
+ /**
+ * Creates ring buffer with given capacity which mapped to file.
+ *
+ * @param buf {@link MappedByteBuffer} instance.
+ * @param metrics Metrics.
+ */
+ public SegmentedRingByteBuffer(MappedByteBuffer buf, DataStorageMetricsImpl metrics) {
+ this(buf.capacity(), buf.capacity(), buf, MAPPED, metrics);
+ }
+
+ /**
+ * @param cap Capacity.
+ * @param maxSegmentSize Max segment size.
+ * @param buf Buffer.
+ * @param mode Mode.
+ * @param metrics Metrics.
+ */
+ private SegmentedRingByteBuffer(
+ int cap,
+ long maxSegmentSize,
+ ByteBuffer buf,
+ BufferMode mode,
+ DataStorageMetricsImpl metrics
+ ) {
+ this.cap = cap;
+ this.mode = mode;
+ this.buf = buf;
+ this.buf.order(ByteOrder.nativeOrder());
+ this.maxSegmentSize = maxSegmentSize;
+ this.metrics = metrics;
+ }
+
+ /**
+ * Performs initialization of ring buffer state.
+ *
+ * @param pos Position.
+ */
+ public void init(long pos) {
+ head = pos;
+ tail = pos;
+ }
+
+ /**
+ * Returns buffer mode.
+ *
+ * @return Buffer mode.
+ */
+ public BufferMode mode() {
+ return mode;
+ }
+
+ /**
+ * Returns actual buffer tail.
+ *
+ * @return Buffer tail.
+ */
+ public long tail() {
+ return tail & SegmentedRingByteBuffer.OPEN_MASK;
+ }
+
+ /**
+ * Reserves {@code size} bytes in {@code SegmentedRingByteBuffer} and returns instance of {@link WriteSegment}
+ * class that points to wrapped {@link ByteBuffer} instance with corresponding capacity. This {@link ByteBuffer}
+ * instance should be used only for data writing to {@link SegmentedRingByteBuffer}.
+ * <p>
+ * Returned result can be {@code null} in case of requested amount of bytes greater then available space
+ * in {@code SegmentedRingByteBuffer}. Also {@link WriteSegment#buffer()} can return {@code null} in case of
+ * {@link #maxSegmentSize} value is exceeded. In this case buffer will be closed in order to prevent any
+ * concurrent threads from trying of reserve new segment.
+ * <p>
+ * This method can be invoked by many producer threads and each producer will get own {@link ByteBuffer} instance
+ * that mapped to own {@link SegmentedRingByteBuffer} slice.
+ * <p>
+ * Once the data has been written into the {@link ByteBuffer} client code must notify
+ * {@code SegmentedRingByteBuffer} instance using {@link WriteSegment#release()} method in order to provide
+ * possibility to consumer get data for reading.
+ *
+ * @param size Amount of bytes for reserve.
+ * @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
+ * {@code null} if buffer space is not enough.
+ */
+ public WriteSegment offer(int size) {
+ return offer0(size, false);
+ }
+
+ /**
+ * Behaves like {@link #offer(int)} but in safe manner: there are no any concurrent threads and buffer in
+ * closed state.
+ *
+ * @param size Amount of bytes for reserve.
+ * @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
+ * {@code null} if buffer space is not enough.
+ */
+ public WriteSegment offerSafe(int size) {
+ return offer0(size, true);
+ }
+
+ /**
+ * @param size Amount of bytes for reserve.
+ * @param safe Safe ьщву.
+ */
+ private WriteSegment offer0(int size, boolean safe) {
+ if (size > cap)
+ throw new IllegalArgumentException("Record is too long [capacity=" + cap + ", size=" + size + ']');
+
+ for (;;) {
+ if (!waitForConsumer) {
+ int cur = producersCnt;
+
+ if (cur >= 0 && PRODUCERS_CNT_UPD.compareAndSet(this, cur, cur + 1))
+ break;
+ }
+ }
+
+ for (;;) {
+ long currTail = tail;
+
+ assert !safe || currTail < 0 : "Unsafe usage of segment ring byte buffer currTail=" + currTail;
+
+ if (currTail < 0) {
+ if (safe)
+ currTail &= SegmentedRingByteBuffer.OPEN_MASK;
+ else
+ return new WriteSegment(null, -1);
+ }
+
+ long head0 = head;
+
+ long currTailIdx = toIndex(currTail);
+
+ boolean fitsSeg = currTail + size <= maxSegmentSize;
+
+ long newTail = fitsSeg ? currTail + size : currTail;
+
+ if (head0 < newTail - cap) { // Not enough space.
+ PRODUCERS_CNT_UPD.decrementAndGet(this);
+
+ return null;
+ }
+ else {
+ // If safe we should keep buffer closed.
+ long tail0 = fitsSeg ? (safe ? newTail | CLOSE_MASK : newTail) : newTail | CLOSE_MASK;
+
+ boolean upd = TAIL_UPD.compareAndSet(this, safe ? currTail | CLOSE_MASK : currTail, tail0);
+
+ assert !safe || upd : "Unsafe usage of segment ring byte buffer";
+
+ if (upd) {
+ if (!fitsSeg)
+ return new WriteSegment(null, -1);
+
+ boolean wrap = cap - currTailIdx < size;
+
+ if (wrap) {
+ long newTailIdx = toIndex(newTail);
+
+ return new WriteSegment(currTail, newTail, newTailIdx == 0 ? newTail : currTail);
+ }
+ else {
+ ByteBuffer slice = slice((int)toIndex(newTail - size), size, false);
+
+ return new WriteSegment(slice, newTail);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes the buffer.
+ */
+ public void close() {
+ for (;;) {
+ long currTail = tail;
+
+ if (currTail < 0)
+ return;
+
+ if(TAIL_UPD.compareAndSet(this, currTail, currTail | CLOSE_MASK))
+ return;
+ }
+ }
+
+ /**
+ * Retrieves list of {@link ReadSegment} instances that point to {@link ByteBuffer} that contains all data available
+ * for reading from {@link SegmentedRingByteBuffer} or {@code null} if there are no available data for reading.
+ * <p>
+ * This method can be invoked only by one consumer thread.
+ * <p>
+ * Once the data has been read from the returned {@link ReadSegment} client code must notify
+ * {@link SegmentedRingByteBuffer} instance using {@link ReadSegment#release()} method in order to release occupied
+ * space in the {@link SegmentedRingByteBuffer} and make it available for writing.
+ *
+ * @return List of {@code ReadSegment} instances with all available data for reading or {@code null} if
+ * there are no available data.
+ */
+ public List<ReadSegment> poll() {
+ return poll(-1);
+ }
+
+ /**
+ * Retrieves list of {@link ReadSegment} instances that point to {@link ByteBuffer} that contains data
+ * available for reading from {@link SegmentedRingByteBuffer} limited by {@code pos} parameter or {@code null}
+ * if there are no available data for reading.
+ * <p>
+ * This method can be invoked only by one consumer thread.
+ * <p>
+ * Once the data has been read from the returned {@link ReadSegment} client code must notify
+ * {@link SegmentedRingByteBuffer} instance using {@link ReadSegment#release()} method in order to release occupied
+ * space in the {@link SegmentedRingByteBuffer} and make it available for writing.
+ *
+ * @param pos End position in buffer.
+ * @return List of {@code ReadSegment} instances with all available data for reading or {@code null} if
+ * there are no available data.
+ */
+ public List<ReadSegment> poll(long pos) {
+ waitForConsumer = true;
+
+ int spins = 0;
+
+ for (;;) {
+ if (PRODUCERS_CNT_UPD.compareAndSet(this, 0, -1))
+ break;
+
+ spins++;
+ }
+
+ if (metrics != null && metrics.metricsEnabled())
+ metrics.onBuffPollSpin(spins);
+
+ long head = this.head;
+
+ long tail = this.tail & OPEN_MASK;
+
+ producersCnt = 0;
+
+ waitForConsumer = false;
+
+ // There are no data for reading or all data up to given position were read.
+ if (tail <= head || (pos >=0 && head > pos))
+ return null;
+
+ int headIdx = (int)toIndex(head);
+
+ int tailIdx = (int)toIndex(tail);
+
+ boolean wrapped = tailIdx <= headIdx;
+
+ if (wrapped && tailIdx != 0) {
+ List<ReadSegment> lst = new ArrayList<>(2);
+
+ int lim = cap - headIdx;
+
+ lst.add(new ReadSegment(slice(headIdx, lim, true), head, head + lim));
+
+ lst.add(new ReadSegment(slice(0, tailIdx, true), head + lim, tail));
+
+ return lst;
+ }
+ else
+ return Collections.singletonList(new ReadSegment(slice(headIdx, (int)(tail - head), true), head, tail));
+ }
+
+ /**
+ * Frees allocated memory in case of direct byte buffer.
+ */
+ public void free() {
+ if (mode == DIRECT || mode == MAPPED)
+ ((DirectBuffer)buf).cleaner().clean();
+ }
+
+ /**
+ * Resets the state of the buffer and returns new instance but with the same underlying buffer.
+ */
+ public SegmentedRingByteBuffer reset() {
+ return new SegmentedRingByteBuffer(buf.capacity(), maxSegmentSize, buf, mode, metrics);
+ }
+
+ /**
+ * @param off Offset.
+ * @param len Length.
+ * @param readOnly Read only.
+ */
+ private ByteBuffer slice(int off, int len, boolean readOnly) {
+ ByteBuffer bb = readOnly ? buf.asReadOnlyBuffer() : buf.duplicate();
+
+ bb.order(ByteOrder.nativeOrder());
+ bb.limit(off + len);
+ bb.position(off);
+
+ return bb;
+ }
+
+ /**
+ * @param globalIdx Global index of ring buffer.
+ * @return Index of byte array.
+ */
+ private long toIndex(long globalIdx) {
+ return globalIdx % cap;
+ }
+
+ /**
+ * @param src Source.
+ * @param srcPos Source pos.
+ * @param dest Destination.
+ * @param destPos Destination pos.
+ * @param len Length.
+ */
+ private void copy(ByteBuffer src, int srcPos, ByteBuffer dest, int destPos, int len) {
+ assert mode != MAPPED;
+
+ if (buf.isDirect()) {
+ ByteBuffer src0 = src.duplicate();
+ src0.limit(srcPos + len);
+ src0.position(srcPos);
+
+ ByteBuffer dest0 = dest.duplicate();
+ dest0.limit(destPos + len);
+ dest0.position(destPos);
+
+ dest0.put(src0);
+ }
+ else
+ System.arraycopy(src.array(), srcPos, buf.array(), destPos, len);
+ }
+
+ /**
+ *
+ */
+ private abstract class Segment {
+ /** Buffer. */
+ protected final ByteBuffer seg;
+
+ /** Pos. */
+ protected final long pos;
+
+ /**
+ * @param seg Seg.
+ * @param pos Pos.
+ */
+ protected Segment(ByteBuffer seg, long pos) {
+ this.seg = seg;
+ this.pos = pos;
+ }
+
+ /**
+ * Releases segment.
+ */
+ abstract public void release();
+
+ /**
+ * Returns byte buffer.
+ *
+ * @return Byte buffer.
+ */
+ abstract public ByteBuffer buffer();
+
+ /**
+ * Returns position.
+ *
+ * @return Position.
+ */
+ public long position() {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Segment.class, this);
+ }
+ }
+
+ /**
+ * Segment available for data writing.
+ */
+ public class WriteSegment extends Segment {
+ /** Current tail. */
+ private final long currTail;
+
+ /** Wrap point. */
+ private final long wrapPnt;
+
+ /**
+ * @param currTail Current tail.
+ * @param newTail New tail.
+ * @param wrapPnt Wrap point.
+ */
+ private WriteSegment(long currTail, long newTail, long wrapPnt) {
+ super(allocate((int)(newTail - currTail)), newTail);
+
+ this.seg.order(ByteOrder.nativeOrder());
+ this.currTail = currTail;
+ this.wrapPnt = wrapPnt;
+ }
+
+ /**
+ * @param seg Seg.
+ * @param pos Pos.
+ */
+ private WriteSegment(ByteBuffer seg, long pos) {
+ super(seg, pos);
+
+ this.currTail = -1;
+ this.wrapPnt = -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer buffer() {
+ return seg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ if (wrapPnt > -1) {
+ int pos = (int)toIndex(currTail);
+
+ int len = cap - pos;
+
+ copy(seg, 0, buf, pos, len);
+
+ copy(seg, len, buf, 0, seg.array().length - len);
+ }
+
+ assert producersCnt >= 0;
+
+ PRODUCERS_CNT_UPD.decrementAndGet(SegmentedRingByteBuffer.this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(WriteSegment.class, this, "super", super.toString());
+ }
+ }
+
+ /**
+ * Segment available for data reading.
+ */
+ public class ReadSegment extends Segment {
+ /** New head. */
+ private final long newHead;
+
+ /**
+ * @param seg Seg.
+ * @param pos Pos.
+ * @param newHead New head.
+ */
+ private ReadSegment(ByteBuffer seg, long pos, long newHead) {
+ super(seg, pos);
+
+ this.newHead = newHead;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ if (newHead >= 0)
+ head = newHead;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer buffer() {
+ return seg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReadSegment.class, this, "super", super.toString());
+ }
+ }
+
+ /**
+ * Buffer mode.
+ */
+ public enum BufferMode {
+ /** Byte buffer on-heap. */
+ ONHEAP,
+
+ /** Direct byte buffer off-heap */
+ DIRECT,
+
+ /** Byte buffer mapped to file. */
+ MAPPED
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
index 35c94a8..c26a0c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
@@ -25,10 +25,10 @@ import org.apache.ignite.internal.util.typedef.internal.S;
*/
public class HeaderRecord extends WALRecord {
/** Magic of regular WAL segment. */
- public static final long REGULAR_MAGIC = 0xB0D045A_CE7ED045AL;
+ public static final long REGULAR_MAGIC = 0xB0D045AC_E7ED045AL;
/** Magic of WAL segment with skipped physical records. */
- public static final long COMPACTED_MAGIC = 0x4E07AE0_E573A694EL;
+ public static final long COMPACTED_MAGIC = 0x4E07AE0E_573A694EL;
/** Serializer version */
private final int ver;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index d478917..12992a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -92,8 +92,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
-
/**
* Record data V1 serializer.
*/
@@ -314,14 +312,14 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
long lsb = in.readLong();
boolean hasPtr = in.readByte() != 0;
int idx = hasPtr ? in.readInt() : 0;
- int offset = hasPtr ? in.readInt() : 0;
+ int off = hasPtr ? in.readInt() : 0;
int len = hasPtr ? in.readInt() : 0;
Map<Integer, CacheState> states = readPartitionStates(in);
boolean end = in.readByte() != 0;
- FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null;
+ FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, off, len) : null;
CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end);
@@ -789,9 +787,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
state = in.readByte();
- long updateCounter = in.readLong();
+ long updateCntr = in.readLong();
- res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter);
+ res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCntr);
break;
@@ -818,10 +816,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
}
/** {@inheritDoc} */
- @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
- switch (record.type()) {
+ @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
+ switch (rec.type()) {
case PAGE_RECORD:
- PageSnapshot snap = (PageSnapshot)record;
+ PageSnapshot snap = (PageSnapshot)rec;
buf.putInt(snap.fullPageId().groupId());
buf.putLong(snap.fullPageId().pageId());
@@ -830,14 +828,14 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case MEMORY_RECOVERY:
- MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
+ MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)rec;
buf.putLong(memoryRecoveryRecord.time());
break;
case PARTITION_DESTROY:
- PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record;
+ PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)rec;
buf.putInt(partDestroy.groupId());
buf.putInt(partDestroy.partitionId());
@@ -845,7 +843,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case META_PAGE_INIT:
- MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record;
+ MetaPageInitRecord updRootsRec = (MetaPageInitRecord)rec;
buf.putInt(updRootsRec.groupId());
buf.putLong(updRootsRec.pageId());
@@ -858,7 +856,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PARTITION_META_PAGE_UPDATE_COUNTERS:
- MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record;
+ MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)rec;
buf.putInt(partDataRec.groupId());
buf.putLong(partDataRec.pageId());
@@ -873,7 +871,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case CHECKPOINT_RECORD:
- CheckpointRecord cpRec = (CheckpointRecord)record;
+ CheckpointRecord cpRec = (CheckpointRecord)rec;
assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
"Invalid WAL record: " + cpRec;
@@ -899,7 +897,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_RECORD:
- DataRecord dataRec = (DataRecord)record;
+ DataRecord dataRec = (DataRecord)rec;
buf.putInt(dataRec.writeEntries().size());
@@ -908,15 +906,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
- case HEADER_RECORD:
- buf.putLong(HeaderRecord.REGULAR_MAGIC);
-
- buf.putInt(((HeaderRecord)record).version());
-
- break;
-
case DATA_PAGE_INSERT_RECORD:
- DataPageInsertRecord diRec = (DataPageInsertRecord)record;
+ DataPageInsertRecord diRec = (DataPageInsertRecord)rec;
buf.putInt(diRec.groupId());
buf.putLong(diRec.pageId());
@@ -928,7 +919,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_PAGE_UPDATE_RECORD:
- DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
+ DataPageUpdateRecord uRec = (DataPageUpdateRecord)rec;
buf.putInt(uRec.groupId());
buf.putLong(uRec.pageId());
@@ -941,7 +932,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_PAGE_INSERT_FRAGMENT_RECORD:
- final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
+ final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)rec;
buf.putInt(difRec.groupId());
buf.putLong(difRec.pageId());
@@ -953,7 +944,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_PAGE_REMOVE_RECORD:
- DataPageRemoveRecord drRec = (DataPageRemoveRecord)record;
+ DataPageRemoveRecord drRec = (DataPageRemoveRecord)rec;
buf.putInt(drRec.groupId());
buf.putLong(drRec.pageId());
@@ -963,7 +954,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_PAGE_SET_FREE_LIST_PAGE:
- DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record;
+ DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)rec;
buf.putInt(freeListRec.groupId());
buf.putLong(freeListRec.pageId());
@@ -973,7 +964,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case INIT_NEW_PAGE_RECORD:
- InitNewPageRecord inpRec = (InitNewPageRecord)record;
+ InitNewPageRecord inpRec = (InitNewPageRecord)rec;
buf.putInt(inpRec.groupId());
buf.putLong(inpRec.pageId());
@@ -985,7 +976,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_META_PAGE_INIT_ROOT:
- MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record;
+ MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)rec;
buf.putInt(imRec.groupId());
buf.putLong(imRec.pageId());
@@ -995,7 +986,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_META_PAGE_INIT_ROOT2:
- MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record;
+ MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)rec;
buf.putInt(imRec2.groupId());
buf.putLong(imRec2.pageId());
@@ -1006,7 +997,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_META_PAGE_ADD_ROOT:
- MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record;
+ MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)rec;
buf.putInt(arRec.groupId());
buf.putLong(arRec.pageId());
@@ -1016,7 +1007,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_META_PAGE_CUT_ROOT:
- MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record;
+ MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)rec;
buf.putInt(crRec.groupId());
buf.putLong(crRec.pageId());
@@ -1024,7 +1015,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_INIT_NEW_ROOT:
- NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
+ NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)rec;
buf.putInt(riRec.groupId());
buf.putLong(riRec.pageId());
@@ -1040,7 +1031,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_PAGE_RECYCLE:
- RecycleRecord recRec = (RecycleRecord)record;
+ RecycleRecord recRec = (RecycleRecord)rec;
buf.putInt(recRec.groupId());
buf.putLong(recRec.pageId());
@@ -1050,7 +1041,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_PAGE_INSERT:
- InsertRecord<?> inRec = (InsertRecord<?>)record;
+ InsertRecord<?> inRec = (InsertRecord<?>)rec;
buf.putInt(inRec.groupId());
buf.putLong(inRec.pageId());
@@ -1065,7 +1056,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_FIX_LEFTMOST_CHILD:
- FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record;
+ FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)rec;
buf.putInt(flRec.groupId());
buf.putLong(flRec.pageId());
@@ -1075,7 +1066,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_FIX_COUNT:
- FixCountRecord fcRec = (FixCountRecord)record;
+ FixCountRecord fcRec = (FixCountRecord)rec;
buf.putInt(fcRec.groupId());
buf.putLong(fcRec.pageId());
@@ -1085,7 +1076,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_PAGE_REPLACE:
- ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
+ ReplaceRecord<?> rRec = (ReplaceRecord<?>)rec;
buf.putInt(rRec.groupId());
buf.putLong(rRec.pageId());
@@ -1099,7 +1090,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_PAGE_REMOVE:
- RemoveRecord rmRec = (RemoveRecord)record;
+ RemoveRecord rmRec = (RemoveRecord)rec;
buf.putInt(rmRec.groupId());
buf.putLong(rmRec.pageId());
@@ -1110,7 +1101,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_PAGE_INNER_REPLACE:
- InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)record;
+ InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)rec;
buf.putInt(irRec.groupId());
buf.putLong(irRec.pageId());
@@ -1123,7 +1114,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_FORWARD_PAGE_SPLIT:
- SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record;
+ SplitForwardPageRecord sfRec = (SplitForwardPageRecord)rec;
buf.putInt(sfRec.groupId());
buf.putLong(sfRec.pageId());
@@ -1138,7 +1129,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_EXISTING_PAGE_SPLIT:
- SplitExistingPageRecord seRec = (SplitExistingPageRecord)record;
+ SplitExistingPageRecord seRec = (SplitExistingPageRecord)rec;
buf.putInt(seRec.groupId());
buf.putLong(seRec.pageId());
@@ -1149,7 +1140,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_PAGE_MERGE:
- MergeRecord<?> mRec = (MergeRecord<?>)record;
+ MergeRecord<?> mRec = (MergeRecord<?>)rec;
buf.putInt(mRec.groupId());
buf.putLong(mRec.pageId());
@@ -1162,7 +1153,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PAGES_LIST_SET_NEXT:
- PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record;
+ PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)rec;
buf.putInt(plNextRec.groupId());
buf.putLong(plNextRec.pageId());
@@ -1172,7 +1163,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PAGES_LIST_SET_PREVIOUS:
- PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record;
+ PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)rec;
buf.putInt(plPrevRec.groupId());
buf.putLong(plPrevRec.pageId());
@@ -1182,7 +1173,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PAGES_LIST_INIT_NEW_PAGE:
- PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record;
+ PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)rec;
buf.putInt(plNewRec.groupId());
buf.putLong(plNewRec.pageId());
@@ -1196,7 +1187,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PAGES_LIST_ADD_PAGE:
- PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record;
+ PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)rec;
buf.putInt(plAddRec.groupId());
buf.putLong(plAddRec.pageId());
@@ -1206,7 +1197,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PAGES_LIST_REMOVE_PAGE:
- PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record;
+ PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)rec;
buf.putInt(plRmvRec.groupId());
buf.putLong(plRmvRec.pageId());
@@ -1216,7 +1207,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case BTREE_FIX_REMOVE_ID:
- FixRemoveId frRec = (FixRemoveId)record;
+ FixRemoveId frRec = (FixRemoveId)rec;
buf.putInt(frRec.groupId());
buf.putLong(frRec.pageId());
@@ -1226,7 +1217,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case TRACKING_PAGE_DELTA:
- TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record;
+ TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)rec;
buf.putInt(tpDelta.groupId());
buf.putLong(tpDelta.pageId());
@@ -1238,7 +1229,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
- MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record;
+ MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)rec;
buf.putInt(mpUpdateNextSnapshotId.groupId());
buf.putLong(mpUpdateNextSnapshotId.pageId());
@@ -1249,7 +1240,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId =
- (MetaPageUpdateLastSuccessfulFullSnapshotId)record;
+ (MetaPageUpdateLastSuccessfulFullSnapshotId)rec;
buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId());
buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId());
@@ -1260,7 +1251,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId =
- (MetaPageUpdateLastSuccessfulSnapshotId)record;
+ (MetaPageUpdateLastSuccessfulSnapshotId)rec;
buf.putInt(mpUpdateLastSuccSnapshotId.groupId());
buf.putLong(mpUpdateLastSuccSnapshotId.pageId());
@@ -1272,7 +1263,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx =
- (MetaPageUpdateLastAllocatedIndex) record;
+ (MetaPageUpdateLastAllocatedIndex) rec;
buf.putInt(mpUpdateLastAllocatedIdx.groupId());
buf.putLong(mpUpdateLastAllocatedIdx.pageId());
@@ -1282,7 +1273,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PART_META_UPDATE_STATE:
- PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record;
+ PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) rec;
buf.putInt(partMetaStateRecord.groupId());
@@ -1295,7 +1286,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case PAGE_LIST_META_RESET_COUNT_RECORD:
- PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record;
+ PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) rec;
buf.putInt(pageListMetaResetCntRecord.groupId());
buf.putLong(pageListMetaResetCntRecord.pageId());
@@ -1303,7 +1294,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case TX_RECORD:
- txRecordSerializer.writeTxRecord((TxRecord)record, buf);
+ txRecordSerializer.writeTxRecord((TxRecord)rec, buf);
break;
@@ -1311,7 +1302,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
default:
- throw new UnsupportedOperationException("Type: " + record.type());
+ throw new UnsupportedOperationException("Type: " + rec.type());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 16a81a4..0e574f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -33,6 +33,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.record.Header
* Record data V2 serializer.
*/
public class RecordDataV2Serializer implements RecordDataSerializer {
+ /** Length of HEADER record data. */
+ static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
+
/** V1 data serializer delegate. */
private final RecordDataV1Serializer delegateSerializer;
@@ -47,10 +50,10 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
/** {@inheritDoc} */
@Override public int size(WALRecord record) throws IgniteCheckedException {
- if (record instanceof HeaderRecord)
- throw new UnsupportedOperationException("Getting size of header records is forbidden since version 2 of serializer");
-
switch (record.type()) {
+ case HEADER_RECORD:
+ return HEADER_RECORD_DATA_SIZE;
+
case DATA_RECORD:
return delegateSerializer.size(record) + 8/*timestamp*/;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index d460705..11bd16b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -68,7 +68,7 @@ public class RecordV1Serializer implements RecordSerializer {
public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE + RecordDataV1Serializer.HEADER_RECORD_DATA_SIZE;
/** Skip CRC calculation/check flag */
- public static boolean SKIP_CRC = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
+ public static boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
/** V1 data serializer. */
private final RecordDataV1Serializer dataSerializer;
@@ -152,15 +152,15 @@ public class RecordV1Serializer implements RecordSerializer {
}
/** {@inheritDoc} */
- @Override public void writeWithHeaders(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+ @Override public void writeWithHeaders(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
// Write record type.
- putRecordType(buf, record);
+ putRecordType(buf, rec);
// Write record file position.
- putPositionOfRecord(buf, record);
+ putPositionOfRecord(buf, rec);
// Write record data.
- dataSerializer.writeRecord(record, buf);
+ dataSerializer.writeRecord(rec, buf);
}
};
@@ -193,8 +193,8 @@ public class RecordV1Serializer implements RecordSerializer {
/** {@inheritDoc} */
@SuppressWarnings("CastConflictsWithInstanceof")
- @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
- writeWithCrc(record, buf, recordIO);
+ @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
+ writeWithCrc(rec, buf, recordIO);
}
/** {@inheritDoc} */
@@ -225,29 +225,29 @@ public class RecordV1Serializer implements RecordSerializer {
*/
public static FileWALPointer readPosition(DataInput in) throws IOException {
long idx = in.readLong();
- int fileOffset = in.readInt();
+ int fileOff = in.readInt();
- return new FileWALPointer(idx, fileOffset, 0);
+ return new FileWALPointer(idx, fileOff, 0);
}
/**
* Writes record file position to given {@code buf}.
*
* @param buf Buffer to write record file position.
- * @param record WAL record.
+ * @param rec WAL record.
*/
- public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) {
- putPosition(buf, (FileWALPointer) record.position());
+ private static void putPositionOfRecord(ByteBuffer buf, WALRecord rec) {
+ putPosition(buf, (FileWALPointer)rec.position());
}
/**
* Writes record type to given {@code buf}.
*
* @param buf Buffer to write record type.
- * @param record WAL record.
+ * @param rec WAL record.
*/
- public static void putRecordType(ByteBuffer buf, WALRecord record) {
- buf.put((byte)(record.type().ordinal() + 1));
+ static void putRecordType(ByteBuffer buf, WALRecord rec) {
+ buf.put((byte)(rec.type().ordinal() + 1));
}
/**
@@ -258,7 +258,7 @@ public class RecordV1Serializer implements RecordSerializer {
* @throws IgniteCheckedException If logical end of segment is reached.
* @throws IOException In case of I/O problems.
*/
- public static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
+ static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
int type = in.readUnsignedByte();
if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
@@ -282,10 +282,10 @@ public class RecordV1Serializer implements RecordSerializer {
* @throws EOFException In case of end of file.
* @throws IgniteCheckedException If it's unable to read record.
*/
- public static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException {
+ static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException {
long startPos = -1;
- try (FileInput.Crc32CheckingFileInput in = in0.startRead(SKIP_CRC)) {
+ try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
startPos = in0.position();
WALRecord res = reader.readWithHeaders(in, expPtr);
@@ -307,19 +307,19 @@ public class RecordV1Serializer implements RecordSerializer {
/**
* Writes record with calculated CRC to buffer {@code buf}.
*
- * @param record WAL record.
+ * @param rec WAL record.
* @param buf Buffer to write.
* @param writer Record write I/O interface.
* @throws IgniteCheckedException If it's unable to write record.
*/
- public static void writeWithCrc(WALRecord record, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
- assert record.size() >= 0 && buf.remaining() >= record.size() : record.size();
+ static void writeWithCrc(WALRecord rec, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
+ assert rec.size() >= 0 && buf.remaining() >= rec.size() : rec.size();
int startPos = buf.position();
- writer.writeWithHeaders(record, buf);
+ writer.writeWithHeaders(rec, buf);
- if (!SKIP_CRC) {
+ if (!skipCrc) {
int curPos = buf.position();
buf.position(startPos);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 05f2a24..adfd701 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -54,7 +54,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serial
*/
public class RecordV2Serializer implements RecordSerializer {
/** Length of WAL Pointer: Index (8) + File offset (4) + Record length (4) */
- public static final int FILE_WAL_POINTER_SIZE = 8 + 4 + 4;
+ private static final int FILE_WAL_POINTER_SIZE = 8 + 4 + 4;
/** V2 data serializer. */
private final RecordDataV2Serializer dataSerializer;
@@ -212,37 +212,38 @@ public class RecordV2Serializer implements RecordSerializer {
* @return Read file WAL pointer.
* @throws IOException If failed to write.
*/
- public static FileWALPointer readPositionAndCheckPoint(
+ @SuppressWarnings("UnusedReturnValue")
+ private static FileWALPointer readPositionAndCheckPoint(
DataInput in,
WALPointer expPtr,
boolean skipPositionCheck
) throws IgniteCheckedException, IOException {
long idx = in.readLong();
- int fileOffset = in.readInt();
- int length = in.readInt();
+ int fileOff = in.readInt();
+ int len = in.readInt();
FileWALPointer p = (FileWALPointer)expPtr;
- if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOffset, p.fileOffset())))
+ if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOff, p.fileOffset())))
throw new WalSegmentTailReachedException(
"WAL segment tail is reached. [ " +
"Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " +
- "Actual state : {Index=" + idx + ",Offset=" + fileOffset + "} ]", null);
+ "Actual state : {Index=" + idx + ",Offset=" + fileOff + "} ]", null);
- return new FileWALPointer(idx, fileOffset, length);
+ return new FileWALPointer(idx, fileOff, len);
}
/**
- * Writes record file position to given {@code buf}.
+ * Writes rec file position to given {@code buf}.
*
- * @param buf Buffer to write record file position.
- * @param record WAL record.
+ * @param buf Buffer to write rec file position.
+ * @param rec WAL rec.
*/
- public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) {
- FileWALPointer p = (FileWALPointer)record.position();
+ private static void putPositionOfRecord(ByteBuffer buf, WALRecord rec) {
+ FileWALPointer p = (FileWALPointer)rec.position();
buf.putLong(p.index());
buf.putInt(p.fileOffset());
- buf.putInt(record.size());
+ buf.putInt(rec.size());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 49d8c18..6a265eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -259,9 +259,8 @@ public class PlatformConfigurationUtils {
}
}
- if (ccfg.getPluginConfigurations() != null) {
+ if (ccfg.getPluginConfigurations() != null)
Collections.addAll(plugins, ccfg.getPluginConfigurations());
- }
ccfg.setPluginConfigurations(plugins.toArray(new CachePluginConfiguration[plugins.size()]));
}
@@ -420,7 +419,8 @@ public class PlatformConfigurationUtils {
out.writeBoolean(f0.isExcludeNeighbors());
out.writeByte((byte) 0); // override flags
out.writeObject(null); // user func
- } else if (f instanceof PlatformAffinityFunction) {
+ }
+ else if (f instanceof PlatformAffinityFunction) {
PlatformAffinityFunction f0 = (PlatformAffinityFunction) f;
AffinityFunction baseFunc = f0.getBaseFunc();
@@ -430,16 +430,17 @@ public class PlatformConfigurationUtils {
out.writeBoolean(((RendezvousAffinityFunction) baseFunc).isExcludeNeighbors());
out.writeByte(f0.getOverrideFlags());
out.writeObject(f0.getUserFunc());
- } else {
+ }
+ else {
out.writeByte((byte) 3);
out.writeInt(f0.partitions());
out.writeBoolean(false); // exclude neighbors
out.writeByte(f0.getOverrideFlags());
out.writeObject(f0.getUserFunc());
}
- } else {
- out.writeByte((byte) 0);
}
+ else
+ out.writeByte((byte)0);
}
/**
@@ -465,9 +466,8 @@ public class PlatformConfigurationUtils {
out.writeInt(p0.getMaxSize());
out.writeLong(p0.getMaxMemorySize());
}
- else {
+ else
out.writeByte((byte)0);
- }
}
/**
@@ -578,9 +578,9 @@ public class PlatformConfigurationUtils {
public static void readIgniteConfiguration(BinaryRawReaderEx in, IgniteConfiguration cfg) {
if (in.readBoolean())
cfg.setClientMode(in.readBoolean());
- int[] eventTypes = in.readIntArray();
- if (eventTypes != null)
- cfg.setIncludeEventTypes(eventTypes);
+ int[] evtTypes = in.readIntArray();
+ if (evtTypes != null)
+ cfg.setIncludeEventTypes(evtTypes);
if (in.readBoolean())
cfg.setMetricsExpireTime(in.readLong());
if (in.readBoolean())
@@ -598,9 +598,9 @@ public class PlatformConfigurationUtils {
String workDir = in.readString();
if (workDir != null)
cfg.setWorkDirectory(workDir);
- String localHost = in.readString();
- if (localHost != null)
- cfg.setLocalHost(localHost);
+ String locHost = in.readString();
+ if (locHost != null)
+ cfg.setLocalHost(locHost);
if (in.readBoolean())
cfg.setDaemon(in.readBoolean());
if (in.readBoolean())
@@ -787,9 +787,9 @@ public class PlatformConfigurationUtils {
* @param in Reader.
*/
private static void readDiscoveryConfiguration(BinaryRawReader in, IgniteConfiguration cfg) {
- boolean hasConfig = in.readBoolean();
+ boolean hasCfg = in.readBoolean();
- if (!hasConfig)
+ if (!hasCfg)
return;
TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -799,21 +799,20 @@ public class PlatformConfigurationUtils {
if (hasIpFinder) {
byte ipFinderType = in.readByte();
- int addrCount = in.readInt();
+ int addrCnt = in.readInt();
ArrayList<String> addrs = null;
- if (addrCount > 0) {
- addrs = new ArrayList<>(addrCount);
+ if (addrCnt > 0) {
+ addrs = new ArrayList<>(addrCnt);
- for (int i = 0; i < addrCount; i++)
+ for (int i = 0; i < addrCnt; i++)
addrs.add(in.readString());
}
TcpDiscoveryVmIpFinder finder = null;
- if (ipFinderType == 1) {
+ if (ipFinderType == 1)
finder = new TcpDiscoveryVmIpFinder();
- }
else if (ipFinderType == 2) {
TcpDiscoveryMulticastIpFinder finder0 = new TcpDiscoveryMulticastIpFinder();
@@ -830,9 +829,8 @@ public class PlatformConfigurationUtils {
finder = finder0;
}
- else {
+ else
assert false;
- }
finder.setAddresses(addrs);
@@ -981,23 +979,23 @@ public class PlatformConfigurationUtils {
* Write query entity.
*
* @param writer Writer.
- * @param queryEntity Query entity.
+ * @param qryEntity Query entity.
*/
- public static void writeQueryEntity(BinaryRawWriter writer, QueryEntity queryEntity) {
- assert queryEntity != null;
+ public static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity) {
+ assert qryEntity != null;
- writer.writeString(queryEntity.getKeyType());
- writer.writeString(queryEntity.getValueType());
- writer.writeString(queryEntity.getTableName());
- writer.writeString(queryEntity.getKeyFieldName());
- writer.writeString(queryEntity.getValueFieldName());
+ writer.writeString(qryEntity.getKeyType());
+ writer.writeString(qryEntity.getValueType());
+ writer.writeString(qryEntity.getTableName());
+ writer.writeString(qryEntity.getKeyFieldName());
+ writer.writeString(qryEntity.getValueFieldName());
// Fields
- LinkedHashMap<String, String> fields = queryEntity.getFields();
+ LinkedHashMap<String, String> fields = qryEntity.getFields();
if (fields != null) {
- Set<String> keyFields = queryEntity.getKeyFields();
- Set<String> notNullFields = queryEntity.getNotNullFields();
+ Set<String> keyFields = qryEntity.getKeyFields();
+ Set<String> notNullFields = qryEntity.getNotNullFields();
writer.writeInt(fields.size());
@@ -1012,7 +1010,7 @@ public class PlatformConfigurationUtils {
writer.writeInt(0);
// Aliases
- Map<String, String> aliases = queryEntity.getAliases();
+ Map<String, String> aliases = qryEntity.getAliases();
if (aliases != null) {
writer.writeInt(aliases.size());
@@ -1026,7 +1024,7 @@ public class PlatformConfigurationUtils {
writer.writeInt(0);
// Indexes
- Collection<QueryIndex> indexes = queryEntity.getIndexes();
+ Collection<QueryIndex> indexes = qryEntity.getIndexes();
if (indexes != null) {
writer.writeInt(indexes.size());
@@ -1042,16 +1040,16 @@ public class PlatformConfigurationUtils {
* Writer query index.
*
* @param writer Writer.
- * @param index Index.
+ * @param idx Index.
*/
- private static void writeQueryIndex(BinaryRawWriter writer, QueryIndex index) {
- assert index != null;
+ private static void writeQueryIndex(BinaryRawWriter writer, QueryIndex idx) {
+ assert idx != null;
- writer.writeString(index.getName());
- writeEnumByte(writer, index.getIndexType());
- writer.writeInt(index.getInlineSize());
+ writer.writeString(idx.getName());
+ writeEnumByte(writer, idx.getIndexType());
+ writer.writeInt(idx.getInlineSize());
- LinkedHashMap<String, Boolean> fields = index.getFields();
+ LinkedHashMap<String, Boolean> fields = idx.getFields();
if (fields != null) {
writer.writeInt(fields.size());
@@ -1273,9 +1271,9 @@ public class PlatformConfigurationUtils {
if (finder instanceof TcpDiscoveryVmIpFinder) {
w.writeBoolean(true);
- boolean isMulticast = finder instanceof TcpDiscoveryMulticastIpFinder;
+ boolean isMcast = finder instanceof TcpDiscoveryMulticastIpFinder;
- w.writeByte((byte)(isMulticast ? 2 : 1));
+ w.writeByte((byte)(isMcast ? 2 : 1));
Collection<InetSocketAddress> addrs = finder.getRegisteredAddresses();
@@ -1284,7 +1282,7 @@ public class PlatformConfigurationUtils {
for (InetSocketAddress a : addrs)
w.writeString(a.toString());
- if (isMulticast) {
+ if (isMcast) {
TcpDiscoveryMulticastIpFinder multiFinder = (TcpDiscoveryMulticastIpFinder) finder;
w.writeString(multiFinder.getLocalAddress());
@@ -1529,9 +1527,8 @@ public class PlatformConfigurationUtils {
w.writeLong(plc.getRateTimeInterval());
}
}
- else {
+ else
w.writeInt(0);
- }
}
/**
@@ -1573,9 +1570,8 @@ public class PlatformConfigurationUtils {
w.writeBoolean(cfg.isTcpNoDelay());
w.writeInt(cfg.getMaxOpenCursorsPerConnection());
w.writeInt(cfg.getThreadPoolSize());
- } else {
+ } else
w.writeBoolean(false);
- }
}
/**
@@ -1640,7 +1636,7 @@ public class PlatformConfigurationUtils {
.setWalStorePath(in.readString())
.setWalArchivePath(in.readString())
.setWalMode(WALMode.fromOrdinal(in.readInt()))
- .setTlbSize(in.readInt())
+ .setWalBufferSize(in.readInt())
.setWalFlushFrequency((int) in.readLong())
.setWalFsyncDelayNanos(in.readLong())
.setWalRecordIteratorBufferSize(in.readInt())
@@ -1729,7 +1725,7 @@ public class PlatformConfigurationUtils {
w.writeString(cfg.getWalStorePath());
w.writeString(cfg.getWalArchivePath());
w.writeInt(cfg.getWalMode().ordinal());
- w.writeInt(cfg.getTlbSize());
+ w.writeInt(cfg.getWalBufferSize());
w.writeLong(cfg.getWalFlushFrequency());
w.writeLong(cfg.getWalFsyncDelayNanos());
w.writeInt(cfg.getWalRecordIteratorBufferSize());
@@ -1740,9 +1736,8 @@ public class PlatformConfigurationUtils {
w.writeInt(cfg.getCheckpointWriteOrder().ordinal());
w.writeBoolean(cfg.isWriteThrottlingEnabled());
- } else {
+ } else
w.writeBoolean(false);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 15e6f2c..98b5c08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -1320,6 +1320,15 @@ public abstract class GridUnsafe {
}
/**
+ * Returns page size.
+ *
+ * @return Page size.
+ */
+ public static int pageSize() {
+ return UNSAFE.pageSize();
+ }
+
+ /**
* Returns unaligned flag.
*/
private static boolean unaligned() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 9804608..b3ca6ff 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9449,11 +9449,9 @@ public abstract class IgniteUtils {
try {
Method mtd = cls.getDeclaredMethod(name, paramTypes);
- if (mtd.getReturnType() != void.class) {
- mtd.setAccessible(true);
+ mtd.setAccessible(true);
- return mtd;
- }
+ return mtd;
}
catch (NoSuchMethodException ignored) {
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
index d26ab35..986f0ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorPersistentStoreConfiguration.java
@@ -111,7 +111,7 @@ public class VisorPersistentStoreConfiguration extends VisorDataTransferObject {
walArchivePath = cfg.getWalArchivePath();
metricsEnabled = cfg.isMetricsEnabled();
walMode = cfg.getWalMode();
- tlbSize = cfg.getWalThreadLocalBufferSize();
+ tlbSize = cfg.getWalBufferSize();
walFlushFreq = cfg.getWalFlushFrequency();
walFsyncDelay = cfg.getWalFsyncDelayNanos();
walRecordIterBuffSize = cfg.getWalRecordIteratorBufferSize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
index 40410cb..2051da3 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
@@ -41,6 +41,10 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics {
@Override float getWalFsyncTimeAverage();
/** {@inheritDoc} */
+ @MXBeanDescription("WAL buffer poll spins number over the last time interval.")
+ @Override long getWalBuffPollSpinsRate();
+
+ /** {@inheritDoc} */
@MXBeanDescription("Duration of the last checkpoint in milliseconds.")
@Override long getLastCheckpointDuration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
index 6245952..15e67b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
@@ -214,4 +214,8 @@ public class IgniteClusterActivateDeactivateTestWithPersistence extends IgniteCl
checkNoCaches(SRVS);
}
+
+ @Override public void testActivateFailover3() throws Exception {
+ super.testActivateFailover3();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
index f3aee08..22d1665 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
@@ -204,7 +204,7 @@ public class IgnitePdsTransactionsHangTest extends GridCommonAbstractTest {
}
}
catch (Throwable e) {
- System.out.println(e.toString());
+ log.error("Unexpected exception:", e);
throw new RuntimeException(e);
}
@@ -229,14 +229,14 @@ public class IgnitePdsTransactionsHangTest extends GridCommonAbstractTest {
max = Math.max(max, sum);
min = Math.min(min, sum);
- System.out.println("Operation count: " + sum + " min=" + min + " max=" + max + " avg=" + totalOperations / (periods - WARM_UP_PERIOD));
+ log.info("Operation count: " + sum + " min=" + min + " max=" + max + " avg=" + totalOperations / (periods - WARM_UP_PERIOD));
}
}
interrupt.set(true);
threadPool.shutdown();
- System.out.println("Test complete");
+ log.info("Test complete");
threadPool.awaitTermination(getTestTimeout(), TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
deleted file mode 100644
index 3b76b63..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.db.wal;
-
-import javax.cache.CacheException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
-
-/**
- *
- */
-public class IgnitePdsWalTlbTest extends GridCommonAbstractTest {
- /** Ip finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Cache name. */
- private static final String CACHE_NAME = "cache";
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(CACHE_NAME);
-
- cfg.setCacheConfiguration(ccfg);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024)
- .setPersistenceEnabled(true)
- .setCheckpointPageBufferSize(DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE + 1))
- .setWalMode(WALMode.LOG_ONLY)
- .setWalThreadLocalBufferSize(640000000);
-
- cfg.setDataStorageConfiguration(memCfg);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- if (gridName.endsWith("1"))
- cfg.setClientMode(true);
-
- cfg.setDiscoverySpi(discoSpi);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 30_000;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
-
- stopAllGrids();
-
- startGrids(2);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testWalDirectOutOfMemory() throws Exception {
- IgniteEx ig = grid(1);
-
- ig.active(true);
-
- boolean locked = true;
-
- try (IgniteDataStreamer<Integer, Integer> streamer = ig.dataStreamer(CACHE_NAME)) {
- for (int i = 0; i < 100_000; i++) {
- streamer.addData(i, 1);
-
- if (i > 0 && i % 10_000 == 0)
- info("Done put: " + i);
- }
- }
- catch (CacheException ignore) {
- // expected
- locked = false;
- }
- finally {
- assertFalse(locked);
-
- stopAllGrids();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 107b467..2bcc22f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.file.OpenOption;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -31,6 +32,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.GridKernalState;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -65,6 +67,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
deleteWorkFiles();
}
@@ -83,12 +87,12 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(cacheCfg);
DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
- .setFileIOFactory(new FailingFileIOFactory())
- .setWalMode(WALMode.BACKGROUND)
- // Setting WAL Segment size to high values forces flushing by timeout.
- .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
+ .setFileIOFactory(new FailingFileIOFactory())
+ .setWalMode(WALMode.BACKGROUND)
+ .setWalBufferSize(128 * 1024)// Setting WAL Segment size to high values forces flushing by timeout.
+ .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
cfg.setDataStorageConfiguration(memCfg);
@@ -121,6 +125,13 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
private void flushingErrorTest() throws Exception {
final IgniteEx grid = startGrid(0);
+ IgniteWriteAheadLogManager wal = grid.context().cache().context().wal();
+
+ boolean mmap = GridTestUtils.getFieldValue(wal, "mmap");
+
+ if (mmap)
+ return;
+
try {
grid.active(true);
@@ -152,7 +163,7 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
}
/**
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed.
*/
private void deleteWorkFiles() throws IgniteCheckedException {
deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
@@ -162,9 +173,10 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
* Create File I/O which fails after second attempt to write to File
*/
private static class FailingFileIOFactory implements FileIOFactory {
+ /** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** */
+ /** Delegate factory. */
private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
/** {@inheritDoc} */
@@ -174,17 +186,23 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
- FileIO delegate = delegateFactory.create(file, modes);
+ final FileIO delegate = delegateFactory.create(file, modes);
return new FileIODecorator(delegate) {
int writeAttempts = 2;
- @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+ @Override public int write(ByteBuffer srcBuf) throws IOException {
if (--writeAttempts == 0)
throw new RuntimeException("Test exception. Unable to write to file.");
- return super.write(sourceBuffer);
+ return super.write(srcBuf);
}
+
+ /** {@inheritDoc} */
+ @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+ return delegate.map(maxWalSegmentSize);
+ }
+
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 057e082..d6c0a9d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -30,6 +31,8 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -139,6 +142,13 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
final Ignite grid = startGridsMultiThreaded(gridCount());
+ IgniteWriteAheadLogManager wal = ((IgniteKernal)grid).context().cache().context().wal();
+
+ boolean mmap = GridTestUtils.getFieldValue(wal, "mmap");
+
+ if (mmap)
+ return;
+
grid.active(true);
IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
@@ -177,8 +187,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
// We should await successful stop of node.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return grid.cluster().nodes().size() == gridCount();
}
}, getTestTimeout());
@@ -228,17 +237,21 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
- FileIO delegate = delegateFactory.create(file, modes);
+ final FileIO delegate = delegateFactory.create(file, modes);
return new FileIODecorator(delegate) {
int writeAttempts = 2;
- @Override public int write(ByteBuffer sourceBuffer) throws IOException {
-
+ @Override public int write(ByteBuffer srcBuf) throws IOException {
if (--writeAttempts == 0 && fail!= null && fail.get())
throw new IOException("No space left on device");
- return super.write(sourceBuffer);
+ return super.write(srcBuf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+ return delegate.map(maxWalSegmentSize);
}
};
}