You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:05 UTC
[flink] 05/10: [FLINK-17547][task][hotfix] Extract methods from
RecordsDeserializer
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9ebdaf40973e1b7ff6aaadd19d1d6cda1d3e69d8
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue May 12 11:24:01 2020 +0200
[FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer
---
.../main/java/org/apache/flink/util/IOUtils.java | 9 +
.../api/serialization/NonSpanningWrapper.java | 81 +++++-
.../network/api/serialization/SpanningWrapper.java | 278 ++++++++++-----------
...SpillingAdaptiveSpanningRecordDeserializer.java | 121 ++++-----
4 files changed, 271 insertions(+), 218 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 02b11e6..1f9af18 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
+import static java.util.Arrays.asList;
+
/**
* An utility class for I/O related functionality.
*/
@@ -244,6 +246,13 @@ public final class IOUtils {
/**
* Closes all elements in the iterable with closeQuietly().
*/
+ public static void closeAllQuietly(AutoCloseable... closeables) {
+ closeAllQuietly(asList(closeables));
+ }
+
+ /**
+ * Closes all elements in the iterable with closeQuietly().
+ */
public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {
if (null != closeables) {
for (AutoCloseable closeable : closeables) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 6d9602f..5de5467 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -17,27 +17,43 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
import java.util.Optional;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+
final class NonSpanningWrapper implements DataInputView {
- MemorySegment segment;
+ private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
+ "Serializer consumed more bytes than the record had. " +
+ "This indicates broken serialization. If you are using custom serialization types " +
+ "(Value or Writable), check their serialization methods. If you are using a " +
+ "Kryo-serialized type, check the corresponding Kryo serializer.";
+
+ private MemorySegment segment;
private int limit;
- int position;
+ private int position;
private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
- int remaining() {
+ private final NextRecordResponse reusedNextRecordResponse = new NextRecordResponse(null, 0); // performance impact of immutable objects not benchmarked
+
+ private int remaining() {
return this.limit - this.position;
}
@@ -47,14 +63,14 @@ final class NonSpanningWrapper implements DataInputView {
this.position = 0;
}
- void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+ void initializeFromMemorySegment(MemorySegment seg, int position, int limit) {
this.segment = seg;
this.position = position;
- this.limit = leftOverLimit;
+ this.limit = limit;
}
Optional<MemorySegment> getUnconsumedSegment() {
- if (remaining() == 0) {
+ if (!hasRemaining()) {
return Optional.empty();
}
MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
@@ -62,6 +78,10 @@ final class NonSpanningWrapper implements DataInputView {
return Optional.of(target);
}
+ boolean hasRemaining() {
+ return remaining() > 0;
+ }
+
// -------------------------------------------------------------------------------------------------------------
// DataInput specific methods
// -------------------------------------------------------------------------------------------------------------
@@ -290,4 +310,53 @@ final class NonSpanningWrapper implements DataInputView {
public int read(byte[] b) {
return read(b, 0, b.length);
}
+
+ ByteBuffer wrapIntoByteBuffer() {
+ return segment.wrap(position, remaining());
+ }
+
+ int copyContentTo(byte[] dst) {
+ final int numBytesChunk = remaining();
+ segment.get(position, dst, 0, numBytesChunk);
+ return numBytesChunk;
+ }
+
+ /**
+ * Copies the data and transfers the "ownership" (i.e. clears current wrapper).
+ */
+ void transferTo(ByteBuffer dst) {
+ segment.get(position, dst, remaining());
+ clear();
+ }
+
+ NextRecordResponse getNextRecord(IOReadableWritable target) throws IOException {
+ int recordLen = readInt();
+ if (canReadRecord(recordLen)) {
+ return readInto(target);
+ } else {
+ return reusedNextRecordResponse.updated(PARTIAL_RECORD, recordLen);
+ }
+ }
+
+ private NextRecordResponse readInto(IOReadableWritable target) throws IOException {
+ try {
+ target.read(this);
+ } catch (IndexOutOfBoundsException e) {
+ throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
+ }
+ int remaining = remaining();
+ if (remaining < 0) {
+ throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new IndexOutOfBoundsException("Remaining = " + remaining));
+ }
+ return reusedNextRecordResponse.updated(remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining);
+ }
+
+ boolean hasCompleteLength() {
+ return remaining() >= LENGTH_BYTES;
+ }
+
+ private boolean canReadRecord(int recordLength) {
+ return recordLength <= remaining();
+ }
+
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index e59363f..430f0db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import java.io.BufferedInputStream;
@@ -38,9 +37,16 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.FileUtils.writeCompletely;
+import static org.apache.flink.util.IOUtils.closeAllQuietly;
+
final class SpanningWrapper {
private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+ private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
private final byte[] initialBuffer = new byte[1024];
@@ -50,7 +56,7 @@ final class SpanningWrapper {
private final DataInputDeserializer serializationReadBuffer;
- private final ByteBuffer lengthBuffer;
+ final ByteBuffer lengthBuffer;
private FileChannel spillingChannel;
@@ -70,10 +76,10 @@ final class SpanningWrapper {
private DataInputViewStreamWrapper spillFileReader;
- public SpanningWrapper(String[] tempDirs) {
+ SpanningWrapper(String[] tempDirs) {
this.tempDirs = tempDirs;
- this.lengthBuffer = ByteBuffer.allocate(4);
+ this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
this.recordLength = -1;
@@ -82,187 +88,161 @@ final class SpanningWrapper {
this.buffer = initialBuffer;
}
- void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
- // set the length and copy what is available to the buffer
- this.recordLength = nextRecordLength;
-
- final int numBytesChunk = partial.remaining();
-
- if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
- // create a spilling channel and put the data there
- this.spillingChannel = createSpillingChannel();
-
- ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
- FileUtils.writeCompletely(this.spillingChannel, toWrite);
- }
- else {
- // collect in memory
- ensureBufferCapacity(nextRecordLength);
- partial.segment.get(partial.position, buffer, 0, numBytesChunk);
- }
-
- this.accumulatedRecordBytes = numBytesChunk;
+ /**
+ * Copies the data and transfers the "ownership" (i.e. clears the passed wrapper).
+ */
+ void transferFrom(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
+ updateLength(nextRecordLength);
+ accumulatedRecordBytes = isAboveSpillingThreshold() ? spill(partial) : partial.copyContentTo(buffer);
+ partial.clear();
}
- void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
- // copy what we have to the length buffer
- partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
+ private boolean isAboveSpillingThreshold() {
+ return recordLength > THRESHOLD_FOR_SPILLING;
}
void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
- int segmentPosition = offset;
- int segmentRemaining = numBytes;
- // check where to go. if we have a partial length, we need to complete it first
- if (this.lengthBuffer.position() > 0) {
- int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
- segment.get(segmentPosition, this.lengthBuffer, toPut);
- // did we complete the length?
- if (this.lengthBuffer.hasRemaining()) {
- return;
- } else {
- this.recordLength = this.lengthBuffer.getInt(0);
-
- this.lengthBuffer.clear();
- segmentPosition += toPut;
- segmentRemaining -= toPut;
- if (this.recordLength > THRESHOLD_FOR_SPILLING) {
- this.spillingChannel = createSpillingChannel();
- } else {
- ensureBufferCapacity(this.recordLength);
- }
- }
+ int limit = offset + numBytes;
+ int numBytesRead = isReadingLength() ? readLength(segment, offset, numBytes) : 0;
+ offset += numBytesRead;
+ numBytes -= numBytesRead;
+ if (numBytes == 0) {
+ return;
}
- // copy as much as we need or can for this next spanning record
- int needed = this.recordLength - this.accumulatedRecordBytes;
- int toCopy = Math.min(needed, segmentRemaining);
+ int toCopy = min(recordLength - accumulatedRecordBytes, numBytes);
+ if (toCopy > 0) {
+ copyFromSegment(segment, offset, toCopy);
+ }
+ if (numBytes > toCopy) {
+ leftOverData = segment;
+ leftOverStart = offset + toCopy;
+ leftOverLimit = limit;
+ }
+ }
- if (spillingChannel != null) {
- // spill to file
- ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
- FileUtils.writeCompletely(this.spillingChannel, toWrite);
+ private void copyFromSegment(MemorySegment segment, int offset, int length) throws IOException {
+ if (spillingChannel == null) {
+ copyIntoBuffer(segment, offset, length);
} else {
- segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
+ copyIntoFile(segment, offset, length);
}
+ }
- this.accumulatedRecordBytes += toCopy;
-
- if (toCopy < segmentRemaining) {
- // there is more data in the segment
- this.leftOverData = segment;
- this.leftOverStart = segmentPosition + toCopy;
- this.leftOverLimit = numBytes + offset;
+ private void copyIntoFile(MemorySegment segment, int offset, int length) throws IOException {
+ writeCompletely(spillingChannel, segment.wrap(offset, length));
+ accumulatedRecordBytes += length;
+ if (hasFullRecord()) {
+ spillingChannel.close();
+ spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
}
+ }
- if (accumulatedRecordBytes == recordLength) {
- // we have the full record
- if (spillingChannel == null) {
- this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
- }
- else {
- spillingChannel.close();
+ private void copyIntoBuffer(MemorySegment segment, int offset, int length) {
+ segment.get(offset, buffer, accumulatedRecordBytes, length);
+ accumulatedRecordBytes += length;
+ if (hasFullRecord()) {
+ serializationReadBuffer.setBuffer(buffer, 0, recordLength);
+ }
+ }
- BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
- this.spillFileReader = new DataInputViewStreamWrapper(inStream);
- }
+ private int readLength(MemorySegment segment, int segmentPosition, int segmentRemaining) throws IOException {
+ int bytesToRead = min(lengthBuffer.remaining(), segmentRemaining);
+ segment.get(segmentPosition, lengthBuffer, bytesToRead);
+ if (!lengthBuffer.hasRemaining()) {
+ updateLength(lengthBuffer.getInt(0));
}
+ return bytesToRead;
}
- Optional<MemorySegment> getUnconsumedSegment() throws IOException {
- // for the case of only partial length, no data
- final int position = lengthBuffer.position();
- if (position > 0) {
- MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
- lengthBuffer.position(0);
- segment.put(0, lengthBuffer, position);
- return Optional.of(segment);
+ private void updateLength(int length) throws IOException {
+ lengthBuffer.clear();
+ recordLength = length;
+ if (isAboveSpillingThreshold()) {
+ spillingChannel = createSpillingChannel();
+ } else {
+ ensureBufferCapacity(length);
}
+ }
- // for the case of full length, partial data in buffer
- if (recordLength > THRESHOLD_FOR_SPILLING) {
- throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled " +
- "records.");
- } else if (recordLength != -1) {
- int leftOverSize = leftOverLimit - leftOverStart;
- int unconsumedSize = Integer.BYTES + accumulatedRecordBytes + leftOverSize;
- DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
- serializer.writeInt(recordLength);
- serializer.write(buffer, 0, accumulatedRecordBytes);
- if (leftOverData != null) {
- serializer.write(leftOverData, leftOverStart, leftOverSize);
- }
- MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
- segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
- return Optional.of(segment);
+ Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+ if (isReadingLength()) {
+ return Optional.of(copyLengthBuffer());
+ } else if (isAboveSpillingThreshold()) {
+ throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
+ } else if (recordLength == -1) {
+ return Optional.empty(); // no remaining partial length or data
+ } else {
+ return Optional.of(copyDataBuffer());
}
+ }
- // for the case of no remaining partial length or data
- return Optional.empty();
+ private MemorySegment copyLengthBuffer() {
+ int position = lengthBuffer.position();
+ MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
+ lengthBuffer.position(0);
+ segment.put(0, lengthBuffer, position);
+ return segment;
}
- void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
- deserializer.clear();
+ private MemorySegment copyDataBuffer() throws IOException {
+ int leftOverSize = leftOverLimit - leftOverStart;
+ int unconsumedSize = LENGTH_BYTES + accumulatedRecordBytes + leftOverSize;
+ DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
+ serializer.writeInt(recordLength);
+ serializer.write(buffer, 0, accumulatedRecordBytes);
+ if (leftOverData != null) {
+ serializer.write(leftOverData, leftOverStart, leftOverSize);
+ }
+ MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+ segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
+ return segment;
+ }
+ /**
+ * Copies the leftover data and transfers the "ownership" (i.e. clears this wrapper).
+ */
+ void transferLeftOverTo(NonSpanningWrapper nonSpanningWrapper) {
+ nonSpanningWrapper.clear();
if (leftOverData != null) {
- deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
+ nonSpanningWrapper.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
}
+ clear();
}
boolean hasFullRecord() {
- return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
+ return recordLength >= 0 && accumulatedRecordBytes >= recordLength;
}
int getNumGatheredBytes() {
- return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
+ return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position());
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
public void clear() {
- this.buffer = initialBuffer;
- this.serializationReadBuffer.releaseArrays();
-
- this.recordLength = -1;
- this.lengthBuffer.clear();
- this.leftOverData = null;
- this.leftOverStart = 0;
- this.leftOverLimit = 0;
- this.accumulatedRecordBytes = 0;
-
- if (spillingChannel != null) {
- try {
- spillingChannel.close();
- }
- catch (Throwable t) {
- // ignore
- }
- spillingChannel = null;
- }
- if (spillFileReader != null) {
- try {
- spillFileReader.close();
- }
- catch (Throwable t) {
- // ignore
- }
- spillFileReader = null;
- }
- if (spillFile != null) {
- spillFile.delete();
- spillFile = null;
- }
+ buffer = initialBuffer;
+ serializationReadBuffer.releaseArrays();
+
+ recordLength = -1;
+ lengthBuffer.clear();
+ leftOverData = null;
+ leftOverStart = 0;
+ leftOverLimit = 0;
+ accumulatedRecordBytes = 0;
+
+ closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete());
+ spillingChannel = null;
+ spillFileReader = null;
+ spillFile = null;
}
public DataInputView getInputView() {
- if (spillFileReader == null) {
- return serializationReadBuffer;
- }
- else {
- return spillFileReader;
- }
+ return spillFileReader == null ? serializationReadBuffer : spillFileReader;
}
private void ensureBufferCapacity(int minLength) {
if (buffer.length < minLength) {
- byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
+ byte[] newBuffer = new byte[max(minLength, buffer.length * 2)];
System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes);
buffer = newBuffer;
}
@@ -294,4 +274,16 @@ final class SpanningWrapper {
random.nextBytes(bytes);
return StringUtils.byteToHexString(bytes);
}
+
+ private int spill(NonSpanningWrapper partial) throws IOException {
+ ByteBuffer buffer = partial.wrapIntoByteBuffer();
+ int length = buffer.remaining();
+ writeCompletely(spillingChannel, buffer);
+ return length;
+ }
+
+ private boolean isReadingLength() {
+ return lengthBuffer.position() > 0;
+ }
+
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index f20fbc9..75e6b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,19 +24,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.Optional;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+
/**
* @param <T> The type of the record to be deserialized.
*/
public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
- private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
- "Serializer consumed more bytes than the record had. " +
- "This indicates broken serialization. If you are using custom serialization types " +
- "(Value or Writable), check their serialization methods. If you are using a " +
- "Kryo-serialized type, check the corresponding Kryo serializer.";
+ static final int LENGTH_BYTES = Integer.BYTES;
private final NonSpanningWrapper nonSpanningWrapper;
@@ -58,11 +61,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
int numBytes = buffer.getSize();
// check if some spanning record deserialization is pending
- if (this.spanningWrapper.getNumGatheredBytes() > 0) {
- this.spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
- }
- else {
- this.nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset);
+ if (spanningWrapper.getNumGatheredBytes() > 0) {
+ spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
+ } else {
+ nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset);
}
}
@@ -75,14 +77,13 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
@Override
public Optional<Buffer> getUnconsumedBuffer() throws IOException {
- Optional<MemorySegment> target;
- if (nonSpanningWrapper.remaining() > 0) {
- target = nonSpanningWrapper.getUnconsumedSegment();
+ final Optional<MemorySegment> unconsumedSegment;
+ if (nonSpanningWrapper.hasRemaining()) {
+ unconsumedSegment = nonSpanningWrapper.getUnconsumedSegment();
} else {
- target = spanningWrapper.getUnconsumedSegment();
+ unconsumedSegment = spanningWrapper.getUnconsumedSegment();
}
- return target.map(memorySegment -> new NetworkBuffer(
- memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, memorySegment.size()));
+ return unconsumedSegment.map(segment -> new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, segment.size()));
}
@Override
@@ -91,65 +92,31 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
// this should be the majority of the cases for small records
// for large records, this portion of the work is very small in comparison anyways
- int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-
- // check if we can get a full length;
- if (nonSpanningRemaining >= 4) {
- int len = this.nonSpanningWrapper.readInt();
-
- if (len <= nonSpanningRemaining - 4) {
- // we can get a full record from here
- try {
- target.read(this.nonSpanningWrapper);
-
- int remaining = this.nonSpanningWrapper.remaining();
- if (remaining > 0) {
- return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
- }
- else if (remaining == 0) {
- return DeserializationResult.LAST_RECORD_FROM_BUFFER;
- }
- else {
- throw new IndexOutOfBoundsException("Remaining = " + remaining);
- }
- }
- catch (IndexOutOfBoundsException e) {
- throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
- }
- }
- else {
- // we got the length, but we need the rest from the spanning deserializer
- // and need to wait for more buffers
- this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
- this.nonSpanningWrapper.clear();
- return DeserializationResult.PARTIAL_RECORD;
- }
- } else if (nonSpanningRemaining > 0) {
- // we have an incomplete length
- // add our part of the length to the length buffer
- this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
- this.nonSpanningWrapper.clear();
- return DeserializationResult.PARTIAL_RECORD;
- }
+ if (nonSpanningWrapper.hasCompleteLength()) {
+ return readNonSpanningRecord(target);
- // spanning record case
- if (this.spanningWrapper.hasFullRecord()) {
- // get the full record
- target.read(this.spanningWrapper.getInputView());
+ } else if (nonSpanningWrapper.hasRemaining()) {
+ nonSpanningWrapper.transferTo(spanningWrapper.lengthBuffer);
+ return PARTIAL_RECORD;
- // move the remainder to the non-spanning wrapper
- // this does not copy it, only sets the memory segment
- this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
- this.spanningWrapper.clear();
+ } else if (spanningWrapper.hasFullRecord()) {
+ target.read(spanningWrapper.getInputView());
+ spanningWrapper.transferLeftOverTo(nonSpanningWrapper);
+ return nonSpanningWrapper.hasRemaining() ? INTERMEDIATE_RECORD_FROM_BUFFER : LAST_RECORD_FROM_BUFFER;
- return (this.nonSpanningWrapper.remaining() == 0) ?
- DeserializationResult.LAST_RECORD_FROM_BUFFER :
- DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
} else {
- return DeserializationResult.PARTIAL_RECORD;
+ return PARTIAL_RECORD;
}
}
+ private DeserializationResult readNonSpanningRecord(T target) throws IOException {
+ NextRecordResponse response = nonSpanningWrapper.getNextRecord(target);
+ if (response.result == PARTIAL_RECORD) {
+ spanningWrapper.transferFrom(nonSpanningWrapper, response.bytesLeft);
+ }
+ return response.result;
+ }
+
@Override
public void clear() {
this.nonSpanningWrapper.clear();
@@ -158,7 +125,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
@Override
public boolean hasUnfinishedData() {
- return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
+ return this.nonSpanningWrapper.hasRemaining() || this.spanningWrapper.getNumGatheredBytes() > 0;
}
+ @NotThreadSafe
+ static class NextRecordResponse {
+ DeserializationResult result;
+ int bytesLeft;
+
+ NextRecordResponse(DeserializationResult result, int bytesLeft) {
+ this.result = result;
+ this.bytesLeft = bytesLeft;
+ }
+
+ public NextRecordResponse updated(DeserializationResult result, int bytesLeft) {
+ this.result = result;
+ this.bytesLeft = bytesLeft;
+ return this;
+ }
+ }
}