You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:05 UTC

[flink] 05/10: [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ebdaf40973e1b7ff6aaadd19d1d6cda1d3e69d8
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue May 12 11:24:01 2020 +0200

    [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer
---
 .../main/java/org/apache/flink/util/IOUtils.java   |   9 +
 .../api/serialization/NonSpanningWrapper.java      |  81 +++++-
 .../network/api/serialization/SpanningWrapper.java | 278 ++++++++++-----------
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 121 ++++-----
 4 files changed, 271 insertions(+), 218 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 02b11e6..1f9af18 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.Socket;
 
+import static java.util.Arrays.asList;
+
 /**
  * An utility class for I/O related functionality.
  */
@@ -244,6 +246,13 @@ public final class IOUtils {
 	/**
 	 * Closes all elements in the iterable with closeQuietly().
 	 */
+	public static void closeAllQuietly(AutoCloseable... closeables) {
+		closeAllQuietly(asList(closeables));
+	}
+
+	/**
+	 * Closes all elements in the iterable with closeQuietly().
+	 */
 	public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {
 		if (null != closeables) {
 			for (AutoCloseable closeable : closeables) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 6d9602f..5de5467 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -17,27 +17,43 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+
 final class NonSpanningWrapper implements DataInputView {
 
-	MemorySegment segment;
+	private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
+			"Serializer consumed more bytes than the record had. " +
+					"This indicates broken serialization. If you are using custom serialization types " +
+					"(Value or Writable), check their serialization methods. If you are using a " +
+					"Kryo-serialized type, check the corresponding Kryo serializer.";
+
+	private MemorySegment segment;
 
 	private int limit;
 
-	int position;
+	private int position;
 
 	private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
 	private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
 
-	int remaining() {
+	private final NextRecordResponse reusedNextRecordResponse = new NextRecordResponse(null, 0); // performance impact of immutable objects not benchmarked
+
+	private int remaining() {
 		return this.limit - this.position;
 	}
 
@@ -47,14 +63,14 @@ final class NonSpanningWrapper implements DataInputView {
 		this.position = 0;
 	}
 
-	void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+	void initializeFromMemorySegment(MemorySegment seg, int position, int limit) {
 		this.segment = seg;
 		this.position = position;
-		this.limit = leftOverLimit;
+		this.limit = limit;
 	}
 
 	Optional<MemorySegment> getUnconsumedSegment() {
-		if (remaining() == 0) {
+		if (!hasRemaining()) {
 			return Optional.empty();
 		}
 		MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
@@ -62,6 +78,10 @@ final class NonSpanningWrapper implements DataInputView {
 		return Optional.of(target);
 	}
 
+	boolean hasRemaining() {
+		return remaining() > 0;
+	}
+
 	// -------------------------------------------------------------------------------------------------------------
 	//                                       DataInput specific methods
 	// -------------------------------------------------------------------------------------------------------------
@@ -290,4 +310,53 @@ final class NonSpanningWrapper implements DataInputView {
 	public int read(byte[] b) {
 		return read(b, 0, b.length);
 	}
+
+	ByteBuffer wrapIntoByteBuffer() {
+		return segment.wrap(position, remaining());
+	}
+
+	int copyContentTo(byte[] dst) {
+		final int numBytesChunk = remaining();
+		segment.get(position, dst, 0, numBytesChunk);
+		return numBytesChunk;
+	}
+
+	/**
+	 * Copies the data and transfers the "ownership" (i.e. clears current wrapper).
+	 */
+	void transferTo(ByteBuffer dst) {
+		segment.get(position, dst, remaining());
+		clear();
+	}
+
+	NextRecordResponse getNextRecord(IOReadableWritable target) throws IOException {
+		int recordLen = readInt();
+		if (canReadRecord(recordLen)) {
+			return readInto(target);
+		} else {
+			return reusedNextRecordResponse.updated(PARTIAL_RECORD, recordLen);
+		}
+	}
+
+	private NextRecordResponse readInto(IOReadableWritable target) throws IOException {
+		try {
+			target.read(this);
+		} catch (IndexOutOfBoundsException e) {
+			throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
+		}
+		int remaining = remaining();
+		if (remaining < 0) {
+			throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new IndexOutOfBoundsException("Remaining = " + remaining));
+		}
+		return reusedNextRecordResponse.updated(remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining);
+	}
+
+	boolean hasCompleteLength() {
+		return remaining() >= LENGTH_BYTES;
+	}
+
+	private boolean canReadRecord(int recordLength) {
+		return recordLength <= remaining();
+	}
+
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index e59363f..430f0db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -38,9 +37,16 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.Random;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.FileUtils.writeCompletely;
+import static org.apache.flink.util.IOUtils.closeAllQuietly;
+
 final class SpanningWrapper {
 
 	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+	private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
 
 	private final byte[] initialBuffer = new byte[1024];
 
@@ -50,7 +56,7 @@ final class SpanningWrapper {
 
 	private final DataInputDeserializer serializationReadBuffer;
 
-	private final ByteBuffer lengthBuffer;
+	final ByteBuffer lengthBuffer;
 
 	private FileChannel spillingChannel;
 
@@ -70,10 +76,10 @@ final class SpanningWrapper {
 
 	private DataInputViewStreamWrapper spillFileReader;
 
-	public SpanningWrapper(String[] tempDirs) {
+	SpanningWrapper(String[] tempDirs) {
 		this.tempDirs = tempDirs;
 
-		this.lengthBuffer = ByteBuffer.allocate(4);
+		this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
 		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
 
 		this.recordLength = -1;
@@ -82,187 +88,161 @@ final class SpanningWrapper {
 		this.buffer = initialBuffer;
 	}
 
-	void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
-		// set the length and copy what is available to the buffer
-		this.recordLength = nextRecordLength;
-
-		final int numBytesChunk = partial.remaining();
-
-		if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-			// create a spilling channel and put the data there
-			this.spillingChannel = createSpillingChannel();
-
-			ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
-			FileUtils.writeCompletely(this.spillingChannel, toWrite);
-		}
-		else {
-			// collect in memory
-			ensureBufferCapacity(nextRecordLength);
-			partial.segment.get(partial.position, buffer, 0, numBytesChunk);
-		}
-
-		this.accumulatedRecordBytes = numBytesChunk;
+	/**
+	 * Copies the data and transfers the "ownership" (i.e. clears the passed wrapper).
+	 */
+	void transferFrom(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
+		updateLength(nextRecordLength);
+		accumulatedRecordBytes = isAboveSpillingThreshold() ? spill(partial) : partial.copyContentTo(buffer);
+		partial.clear();
 	}
 
-	void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
-		// copy what we have to the length buffer
-		partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
+	private boolean isAboveSpillingThreshold() {
+		return recordLength > THRESHOLD_FOR_SPILLING;
 	}
 
 	void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
-		int segmentPosition = offset;
-		int segmentRemaining = numBytes;
-		// check where to go. if we have a partial length, we need to complete it first
-		if (this.lengthBuffer.position() > 0) {
-			int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
-			segment.get(segmentPosition, this.lengthBuffer, toPut);
-			// did we complete the length?
-			if (this.lengthBuffer.hasRemaining()) {
-				return;
-			} else {
-				this.recordLength = this.lengthBuffer.getInt(0);
-
-				this.lengthBuffer.clear();
-				segmentPosition += toPut;
-				segmentRemaining -= toPut;
-				if (this.recordLength > THRESHOLD_FOR_SPILLING) {
-					this.spillingChannel = createSpillingChannel();
-				} else {
-					ensureBufferCapacity(this.recordLength);
-				}
-			}
+		int limit = offset + numBytes;
+		int numBytesRead = isReadingLength() ? readLength(segment, offset, numBytes) : 0;
+		offset += numBytesRead;
+		numBytes -= numBytesRead;
+		if (numBytes == 0) {
+			return;
 		}
 
-		// copy as much as we need or can for this next spanning record
-		int needed = this.recordLength - this.accumulatedRecordBytes;
-		int toCopy = Math.min(needed, segmentRemaining);
+		int toCopy = min(recordLength - accumulatedRecordBytes, numBytes);
+		if (toCopy > 0) {
+			copyFromSegment(segment, offset, toCopy);
+		}
+		if (numBytes > toCopy) {
+			leftOverData = segment;
+			leftOverStart = offset + toCopy;
+			leftOverLimit = limit;
+		}
+	}
 
-		if (spillingChannel != null) {
-			// spill to file
-			ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
-			FileUtils.writeCompletely(this.spillingChannel, toWrite);
+	private void copyFromSegment(MemorySegment segment, int offset, int length) throws IOException {
+		if (spillingChannel == null) {
+			copyIntoBuffer(segment, offset, length);
 		} else {
-			segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
+			copyIntoFile(segment, offset, length);
 		}
+	}
 
-		this.accumulatedRecordBytes += toCopy;
-
-		if (toCopy < segmentRemaining) {
-			// there is more data in the segment
-			this.leftOverData = segment;
-			this.leftOverStart = segmentPosition + toCopy;
-			this.leftOverLimit = numBytes + offset;
+	private void copyIntoFile(MemorySegment segment, int offset, int length) throws IOException {
+		writeCompletely(spillingChannel, segment.wrap(offset, length));
+		accumulatedRecordBytes += length;
+		if (hasFullRecord()) {
+			spillingChannel.close();
+			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
 		}
+	}
 
-		if (accumulatedRecordBytes == recordLength) {
-			// we have the full record
-			if (spillingChannel == null) {
-				this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
-			}
-			else {
-				spillingChannel.close();
+	private void copyIntoBuffer(MemorySegment segment, int offset, int length) {
+		segment.get(offset, buffer, accumulatedRecordBytes, length);
+		accumulatedRecordBytes += length;
+		if (hasFullRecord()) {
+			serializationReadBuffer.setBuffer(buffer, 0, recordLength);
+		}
+	}
 
-				BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
-				this.spillFileReader = new DataInputViewStreamWrapper(inStream);
-			}
+	private int readLength(MemorySegment segment, int segmentPosition, int segmentRemaining) throws IOException {
+		int bytesToRead = min(lengthBuffer.remaining(), segmentRemaining);
+		segment.get(segmentPosition, lengthBuffer, bytesToRead);
+		if (!lengthBuffer.hasRemaining()) {
+			updateLength(lengthBuffer.getInt(0));
 		}
+		return bytesToRead;
 	}
 
-	Optional<MemorySegment> getUnconsumedSegment() throws IOException {
-		// for the case of only partial length, no data
-		final int position = lengthBuffer.position();
-		if (position > 0) {
-			MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
-			lengthBuffer.position(0);
-			segment.put(0, lengthBuffer, position);
-			return Optional.of(segment);
+	private void updateLength(int length) throws IOException {
+		lengthBuffer.clear();
+		recordLength = length;
+		if (isAboveSpillingThreshold()) {
+			spillingChannel = createSpillingChannel();
+		} else {
+			ensureBufferCapacity(length);
 		}
+	}
 
-		// for the case of full length, partial data in buffer
-		if (recordLength > THRESHOLD_FOR_SPILLING) {
-			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled " +
-				"records.");
-		} else if (recordLength != -1) {
-			int leftOverSize = leftOverLimit - leftOverStart;
-			int unconsumedSize = Integer.BYTES + accumulatedRecordBytes + leftOverSize;
-			DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
-			serializer.writeInt(recordLength);
-			serializer.write(buffer, 0, accumulatedRecordBytes);
-			if (leftOverData != null) {
-				serializer.write(leftOverData, leftOverStart, leftOverSize);
-			}
-			MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
-			segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
-			return Optional.of(segment);
+	Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+		if (isReadingLength()) {
+			return Optional.of(copyLengthBuffer());
+		} else if (isAboveSpillingThreshold()) {
+			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
+		} else if (recordLength == -1) {
+			return Optional.empty(); // no remaining partial length or data
+		} else {
+			return Optional.of(copyDataBuffer());
 		}
+	}
 
-		// for the case of no remaining partial length or data
-		return Optional.empty();
+	private MemorySegment copyLengthBuffer() {
+		int position = lengthBuffer.position();
+		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
+		lengthBuffer.position(0);
+		segment.put(0, lengthBuffer, position);
+		return segment;
 	}
 
-	void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
-		deserializer.clear();
+	private MemorySegment copyDataBuffer() throws IOException {
+		int leftOverSize = leftOverLimit - leftOverStart;
+		int unconsumedSize = LENGTH_BYTES + accumulatedRecordBytes + leftOverSize;
+		DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
+		serializer.writeInt(recordLength);
+		serializer.write(buffer, 0, accumulatedRecordBytes);
+		if (leftOverData != null) {
+			serializer.write(leftOverData, leftOverStart, leftOverSize);
+		}
+		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+		segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
+		return segment;
+	}
 
+	/**
+	 * Copies the leftover data and transfers the "ownership" (i.e. clears this wrapper).
+	 */
+	void transferLeftOverTo(NonSpanningWrapper nonSpanningWrapper) {
+		nonSpanningWrapper.clear();
 		if (leftOverData != null) {
-			deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
+			nonSpanningWrapper.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
 		}
+		clear();
 	}
 
 	boolean hasFullRecord() {
-		return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
+		return recordLength >= 0 && accumulatedRecordBytes >= recordLength;
 	}
 
 	int getNumGatheredBytes() {
-		return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
+		return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position());
 	}
 
+	@SuppressWarnings("ResultOfMethodCallIgnored")
 	public void clear() {
-		this.buffer = initialBuffer;
-		this.serializationReadBuffer.releaseArrays();
-
-		this.recordLength = -1;
-		this.lengthBuffer.clear();
-		this.leftOverData = null;
-		this.leftOverStart = 0;
-		this.leftOverLimit = 0;
-		this.accumulatedRecordBytes = 0;
-
-		if (spillingChannel != null) {
-			try {
-				spillingChannel.close();
-			}
-			catch (Throwable t) {
-				// ignore
-			}
-			spillingChannel = null;
-		}
-		if (spillFileReader != null) {
-			try {
-				spillFileReader.close();
-			}
-			catch (Throwable t) {
-				// ignore
-			}
-			spillFileReader = null;
-		}
-		if (spillFile != null) {
-			spillFile.delete();
-			spillFile = null;
-		}
+		buffer = initialBuffer;
+		serializationReadBuffer.releaseArrays();
+
+		recordLength = -1;
+		lengthBuffer.clear();
+		leftOverData = null;
+		leftOverStart = 0;
+		leftOverLimit = 0;
+		accumulatedRecordBytes = 0;
+
+		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete());
+		spillingChannel = null;
+		spillFileReader = null;
+		spillFile = null;
 	}
 
 	public DataInputView getInputView() {
-		if (spillFileReader == null) {
-			return serializationReadBuffer;
-		}
-		else {
-			return spillFileReader;
-		}
+		return spillFileReader == null ? serializationReadBuffer : spillFileReader;
 	}
 
 	private void ensureBufferCapacity(int minLength) {
 		if (buffer.length < minLength) {
-			byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
+			byte[] newBuffer = new byte[max(minLength, buffer.length * 2)];
 			System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes);
 			buffer = newBuffer;
 		}
@@ -294,4 +274,16 @@ final class SpanningWrapper {
 		random.nextBytes(bytes);
 		return StringUtils.byteToHexString(bytes);
 	}
+
+	private int spill(NonSpanningWrapper partial) throws IOException {
+		ByteBuffer buffer = partial.wrapIntoByteBuffer();
+		int length = buffer.remaining();
+		writeCompletely(spillingChannel, buffer);
+		return length;
+	}
+
+	private boolean isReadingLength() {
+		return lengthBuffer.position() > 0;
+	}
+
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index f20fbc9..75e6b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,19 +24,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.io.IOException;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+
 /**
  * @param <T> The type of the record to be deserialized.
  */
 public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
 
-	private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
-					"Serializer consumed more bytes than the record had. " +
-					"This indicates broken serialization. If you are using custom serialization types " +
-					"(Value or Writable), check their serialization methods. If you are using a " +
-					"Kryo-serialized type, check the corresponding Kryo serializer.";
+	static final int LENGTH_BYTES = Integer.BYTES;
 
 	private final NonSpanningWrapper nonSpanningWrapper;
 
@@ -58,11 +61,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		int numBytes = buffer.getSize();
 
 		// check if some spanning record deserialization is pending
-		if (this.spanningWrapper.getNumGatheredBytes() > 0) {
-			this.spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
-		}
-		else {
-			this.nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset);
+		if (spanningWrapper.getNumGatheredBytes() > 0) {
+			spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
+		} else {
+			nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset);
 		}
 	}
 
@@ -75,14 +77,13 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	@Override
 	public Optional<Buffer> getUnconsumedBuffer() throws IOException {
-		Optional<MemorySegment> target;
-		if (nonSpanningWrapper.remaining() > 0) {
-			target = nonSpanningWrapper.getUnconsumedSegment();
+		final Optional<MemorySegment> unconsumedSegment;
+		if (nonSpanningWrapper.hasRemaining()) {
+			unconsumedSegment = nonSpanningWrapper.getUnconsumedSegment();
 		} else {
-			target = spanningWrapper.getUnconsumedSegment();
+			unconsumedSegment = spanningWrapper.getUnconsumedSegment();
 		}
-		return target.map(memorySegment -> new NetworkBuffer(
-			memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, memorySegment.size()));
+		return unconsumedSegment.map(segment -> new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, segment.size()));
 	}
 
 	@Override
@@ -91,65 +92,31 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		// this should be the majority of the cases for small records
 		// for large records, this portion of the work is very small in comparison anyways
 
-		int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-
-		// check if we can get a full length;
-		if (nonSpanningRemaining >= 4) {
-			int len = this.nonSpanningWrapper.readInt();
-
-			if (len <= nonSpanningRemaining - 4) {
-				// we can get a full record from here
-				try {
-					target.read(this.nonSpanningWrapper);
-
-					int remaining = this.nonSpanningWrapper.remaining();
-					if (remaining > 0) {
-						return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-					}
-					else if (remaining == 0) {
-						return DeserializationResult.LAST_RECORD_FROM_BUFFER;
-					}
-					else {
-						throw new IndexOutOfBoundsException("Remaining = " + remaining);
-					}
-				}
-				catch (IndexOutOfBoundsException e) {
-					throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
-				}
-			}
-			else {
-				// we got the length, but we need the rest from the spanning deserializer
-				// and need to wait for more buffers
-				this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
-				this.nonSpanningWrapper.clear();
-				return DeserializationResult.PARTIAL_RECORD;
-			}
-		} else if (nonSpanningRemaining > 0) {
-			// we have an incomplete length
-			// add our part of the length to the length buffer
-			this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
-			this.nonSpanningWrapper.clear();
-			return DeserializationResult.PARTIAL_RECORD;
-		}
+		if (nonSpanningWrapper.hasCompleteLength()) {
+			return readNonSpanningRecord(target);
 
-		// spanning record case
-		if (this.spanningWrapper.hasFullRecord()) {
-			// get the full record
-			target.read(this.spanningWrapper.getInputView());
+		} else if (nonSpanningWrapper.hasRemaining()) {
+			nonSpanningWrapper.transferTo(spanningWrapper.lengthBuffer);
+			return PARTIAL_RECORD;
 
-			// move the remainder to the non-spanning wrapper
-			// this does not copy it, only sets the memory segment
-			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
-			this.spanningWrapper.clear();
+		} else if (spanningWrapper.hasFullRecord()) {
+			target.read(spanningWrapper.getInputView());
+			spanningWrapper.transferLeftOverTo(nonSpanningWrapper);
+			return nonSpanningWrapper.hasRemaining() ? INTERMEDIATE_RECORD_FROM_BUFFER : LAST_RECORD_FROM_BUFFER;
 
-			return (this.nonSpanningWrapper.remaining() == 0) ?
-				DeserializationResult.LAST_RECORD_FROM_BUFFER :
-				DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 		} else {
-			return DeserializationResult.PARTIAL_RECORD;
+			return PARTIAL_RECORD;
 		}
 	}
 
+	private DeserializationResult readNonSpanningRecord(T target) throws IOException {
+		NextRecordResponse response = nonSpanningWrapper.getNextRecord(target);
+		if (response.result == PARTIAL_RECORD) {
+			spanningWrapper.transferFrom(nonSpanningWrapper, response.bytesLeft);
+		}
+		return response.result;
+	}
+
 	@Override
 	public void clear() {
 		this.nonSpanningWrapper.clear();
@@ -158,7 +125,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	@Override
 	public boolean hasUnfinishedData() {
-		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
+		return this.nonSpanningWrapper.hasRemaining() || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
+	@NotThreadSafe
+	static class NextRecordResponse {
+		DeserializationResult result;
+		int bytesLeft;
+
+		NextRecordResponse(DeserializationResult result, int bytesLeft) {
+			this.result = result;
+			this.bytesLeft = bytesLeft;
+		}
+
+		public NextRecordResponse updated(DeserializationResult result, int bytesLeft) {
+			this.result = result;
+			this.bytesLeft = bytesLeft;
+			return this;
+		}
+	}
 }