You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:10 UTC

[flink] 10/10: [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ed38c12be7151ab49e9cf2b4e2d8138f1ae4c62
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 16:48:47 2020 +0200

    [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers
---
 .../flink/core/memory/MemorySegmentFactory.java    |  28 ++++-
 .../org/apache/flink/util/CloseableIterator.java   |  32 ++++++
 .../core/memory/MemorySegmentFactoryTest.java      |  64 ++++++++++++
 .../apache/flink/util/CloseableIteratorTest.java   |  82 +++++++++++++++
 .../runtime/io/disk/FileBasedBufferIterator.java   |  90 ++++++++++++++++
 .../network/api/serialization/SpanningWrapper.java |  54 +++++++---
 .../api/serialization/SpanningWrapperTest.java     | 115 +++++++++++++++++++++
 7 files changed, 448 insertions(+), 17 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index ee301a1..f643bc4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
  *
@@ -53,6 +55,31 @@ public final class MemorySegmentFactory {
 	}
 
 	/**
+	 * Copies the given heap memory region and creates a new memory segment wrapping it.
+	 *
+	 * @param bytes The heap memory region.
+	 * @param start starting position, inclusive
+	 * @param end end position, exclusive
+	 * @return A new memory segment that targets a copy of the given heap memory region.
+	 * @throws IllegalArgumentException if start > end or end > bytes.length
+	 */
+	public static MemorySegment wrapCopy(byte[] bytes, int start, int end) throws IllegalArgumentException {
+		checkArgument(end >= start);
+		checkArgument(end <= bytes.length);
+		MemorySegment copy = allocateUnpooledSegment(end - start);
+		copy.put(0, bytes, start, copy.size());
+		return copy;
+	}
+
+	/**
+	 * Wraps the four bytes representing the given number with a {@link MemorySegment}.
+	 * @see ByteBuffer#putInt(int)
+	 */
+	public static MemorySegment wrapInt(int value) {
+		return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
+	}
+
+	/**
 	 * Allocates some unpooled memory and creates a new memory segment that represents
 	 * that memory.
 	 *
@@ -161,5 +188,4 @@ public final class MemorySegmentFactory {
 	public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
 		return new HybridMemorySegment(memory, null);
 	}
-
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
index cc51324..e0c5ec0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -24,7 +24,9 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
@@ -80,6 +82,36 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
 		};
 	}
 
+	static <T> CloseableIterator<T> flatten(CloseableIterator<T>... iterators) {
+		return new CloseableIterator<T>() {
+			private final Queue<CloseableIterator<T>> queue = removeEmptyHead(new LinkedList<>(asList(iterators)));
+
+			private Queue<CloseableIterator<T>> removeEmptyHead(Queue<CloseableIterator<T>> queue) {
+				while (!queue.isEmpty() && !queue.peek().hasNext()) {
+					queue.poll();
+				}
+				return queue;
+			}
+
+			@Override
+			public boolean hasNext() {
+				removeEmptyHead(queue);
+				return !queue.isEmpty();
+			}
+
+			@Override
+			public T next() {
+				removeEmptyHead(queue);
+				return queue.peek().next();
+			}
+
+			@Override
+			public void close() throws Exception {
+				IOUtils.closeAll(iterators);
+			}
+		};
+	}
+
 	@SuppressWarnings("unchecked")
 	static <T> CloseableIterator<T> empty() {
 		return (CloseableIterator<T>) EMPTY_INSTANCE;
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
new file mode 100644
index 0000000..59c1d7e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import static java.lang.System.arraycopy;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link MemorySegmentFactory} test.
+ */
+public class MemorySegmentFactoryTest {
+
+	@Test
+	public void testWrapCopyChangingData() {
+		byte[] data = {1, 2, 3, 4, 5};
+		byte[] changingData = new byte[data.length];
+		arraycopy(data, 0, changingData, 0, data.length);
+		MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
+		changingData[0]++;
+		assertArrayEquals(data, segment.heapMemory);
+	}
+
+	@Test
+	public void testWrapPartialCopy() {
+		byte[] data = {1, 2, 3, 5, 6};
+		MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2);
+		byte[] exp = new byte[segment.size()];
+		arraycopy(data, 0, exp, 0, exp.length);
+		assertArrayEquals(exp, segment.heapMemory);
+	}
+
+	@Test
+	public void testWrapCopyEmpty() {
+		MemorySegmentFactory.wrapCopy(new byte[0], 0, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapCopyWrongStart() {
+		MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 10, 3);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapCopyWrongEnd() {
+		MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 0, 10);
+	}
+
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
new file mode 100644
index 0000000..e2d4d3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link CloseableIterator} test.
+ */
+@SuppressWarnings("unchecked")
+public class CloseableIteratorTest {
+
+	private static final String[] ELEMENTS = new String[]{"flink", "blink"};
+
+	@Test
+	public void testFlattenEmpty() throws Exception {
+		List<CloseableIterator<?>> iterators = asList(
+				CloseableIterator.flatten(),
+				CloseableIterator.flatten(CloseableIterator.empty()),
+				CloseableIterator.flatten(CloseableIterator.flatten()));
+		for (CloseableIterator<?> i : iterators) {
+			assertFalse(i.hasNext());
+			i.close();
+		}
+	}
+
+	@Test
+	public void testFlattenIteration() {
+		CloseableIterator<String> iterator = CloseableIterator.flatten(
+				CloseableIterator.ofElement(ELEMENTS[0], unused -> {
+				}),
+				CloseableIterator.ofElement(ELEMENTS[1], unused -> {
+				})
+		);
+
+		List<String> iterated = new ArrayList<>();
+		iterator.forEachRemaining(iterated::add);
+		assertArrayEquals(ELEMENTS, iterated.toArray());
+	}
+
+	@Test(expected = TestException.class)
+	public void testFlattenErrorHandling() throws Exception {
+		List<String> closed = new ArrayList<>();
+		CloseableIterator<String> iterator = CloseableIterator.flatten(
+				CloseableIterator.ofElement(ELEMENTS[0], e -> {
+					closed.add(e);
+					throw new TestException();
+				}),
+				CloseableIterator.ofElement(ELEMENTS[1], closed::add)
+		);
+		try {
+			iterator.close();
+		} finally {
+			assertArrayEquals(ELEMENTS, closed.toArray());
+		}
+	}
+
+	private static class TestException extends RuntimeException {
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
new file mode 100644
index 0000000..c7e1cd8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RefCountedFile;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link CloseableIterator} of {@link Buffer buffers} over file content.
+ */
+@Internal
+public class FileBasedBufferIterator implements CloseableIterator<Buffer> {
+
+	private final RefCountedFile file;
+	private final FileInputStream stream;
+	private final int bufferSize;
+
+	private int offset;
+	private int bytesToRead;
+
+	public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, int bufferSize) throws FileNotFoundException {
+		checkNotNull(file);
+		checkArgument(bytesToRead >= 0);
+		checkArgument(bufferSize > 0);
+		this.stream = new FileInputStream(file.getFile());
+		this.file = file;
+		this.bufferSize = bufferSize;
+		this.bytesToRead = bytesToRead;
+		file.retain();
+	}
+
+	@Override
+	public boolean hasNext() {
+		return bytesToRead > 0;
+	}
+
+	@Override
+	public Buffer next() {
+		byte[] buffer = new byte[bufferSize];
+		int bytesRead = read(buffer);
+		checkState(bytesRead >= 0, "unexpected end of file, file = " + file.getFile() + ", offset=" + offset);
+		offset += bytesRead;
+		bytesToRead -= bytesRead;
+		return new NetworkBuffer(wrap(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
+	}
+
+	private int read(byte[] buffer) {
+		int limit = Math.min(buffer.length, bytesToRead);
+		try {
+			return stream.read(buffer, offset, limit);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		closeAll(stream, file::release);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 9cffde3..45d6ad7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -24,7 +24,10 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.FileBasedBufferIterator;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.StringUtils;
 
@@ -41,15 +44,18 @@ import java.util.Random;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopy;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapInt;
 import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.CloseableIterator.empty;
 import static org.apache.flink.util.FileUtils.writeCompletely;
 import static org.apache.flink.util.IOUtils.closeAllQuietly;
 
 final class SpanningWrapper {
 
-	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
-	private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
+	private static final int DEFAULT_THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+	private static final int DEFAULT_FILE_BUFFER_SIZE = 2 * 1024 * 1024;
 
 	private final byte[] initialBuffer = new byte[1024];
 
@@ -61,6 +67,8 @@ final class SpanningWrapper {
 
 	final ByteBuffer lengthBuffer;
 
+	private final int fileBufferSize;
+
 	private FileChannel spillingChannel;
 
 	private byte[] buffer;
@@ -79,16 +87,21 @@ final class SpanningWrapper {
 
 	private DataInputViewStreamWrapper spillFileReader;
 
+	private int thresholdForSpilling;
+
 	SpanningWrapper(String[] tempDirs) {
-		this.tempDirs = tempDirs;
+		this(tempDirs, DEFAULT_THRESHOLD_FOR_SPILLING, DEFAULT_FILE_BUFFER_SIZE);
+	}
 
+	SpanningWrapper(String[] tempDirectories, int threshold, int fileBufferSize) {
+		this.tempDirs = tempDirectories;
 		this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
 		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
 		this.recordLength = -1;
-
 		this.serializationReadBuffer = new DataInputDeserializer();
 		this.buffer = initialBuffer;
+		this.thresholdForSpilling = threshold;
+		this.fileBufferSize = fileBufferSize;
 	}
 
 	/**
@@ -101,7 +114,7 @@ final class SpanningWrapper {
 	}
 
 	private boolean isAboveSpillingThreshold() {
-		return recordLength > THRESHOLD_FOR_SPILLING;
+		return recordLength > thresholdForSpilling;
 	}
 
 	void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
@@ -137,7 +150,7 @@ final class SpanningWrapper {
 		accumulatedRecordBytes += length;
 		if (hasFullRecord()) {
 			spillingChannel.close();
-			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE));
+			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), fileBufferSize));
 		}
 	}
 
@@ -170,22 +183,26 @@ final class SpanningWrapper {
 
 	CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
 		if (isReadingLength()) {
-			return singleBufferIterator(copyLengthBuffer());
+			return singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, lengthBuffer.position()));
 		} else if (isAboveSpillingThreshold()) {
-			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
+			return createSpilledDataIterator();
 		} else if (recordLength == -1) {
-			return CloseableIterator.empty(); // no remaining partial length or data
+			return empty(); // no remaining partial length or data
 		} else {
 			return singleBufferIterator(copyDataBuffer());
 		}
 	}
 
-	private MemorySegment copyLengthBuffer() {
-		int position = lengthBuffer.position();
-		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
-		lengthBuffer.position(0);
-		segment.put(0, lengthBuffer, position);
-		return segment;
+	@SuppressWarnings("unchecked")
+	private CloseableIterator<Buffer> createSpilledDataIterator() throws IOException {
+		if (spillingChannel != null && spillingChannel.isOpen()) {
+			spillingChannel.force(false);
+		}
+		return CloseableIterator.flatten(
+			toSingleBufferIterator(wrapInt(recordLength)),
+			new FileBasedBufferIterator(spillFile, min(accumulatedRecordBytes, recordLength), fileBufferSize),
+			leftOverData == null ? empty() : toSingleBufferIterator(wrapCopy(leftOverData.getArray(), leftOverStart, leftOverLimit))
+		);
 	}
 
 	private MemorySegment copyDataBuffer() throws IOException {
@@ -289,4 +306,9 @@ final class SpanningWrapper {
 		return lengthBuffer.position() > 0;
 	}
 
+	private static CloseableIterator<Buffer> toSingleBufferIterator(MemorySegment segment) {
+		NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
+		return CloseableIterator.ofElement(buffer, Buffer::recycleBuffer);
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
new file mode 100644
index 0000000..be57e21
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link SpanningWrapper} test.
+ */
+public class SpanningWrapperTest {
+
+	private static final Random random = new Random();
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	@Test
+	public void testLargeUnconsumedSegment() throws Exception {
+		int recordLen = 100;
+		int firstChunk = (int) (recordLen * .9);
+		int spillingThreshold = (int) (firstChunk * .9);
+
+		byte[] record1 = recordBytes(recordLen);
+		byte[] record2 = recordBytes(recordLen * 2);
+
+		SpanningWrapper spanningWrapper = new SpanningWrapper(new String[]{folder.newFolder().getAbsolutePath()}, spillingThreshold, recordLen);
+		spanningWrapper.transferFrom(wrapNonSpanning(record1, firstChunk), recordLen);
+		spanningWrapper.addNextChunkFromMemorySegment(wrap(record1), firstChunk, recordLen - firstChunk + LENGTH_BYTES);
+		spanningWrapper.addNextChunkFromMemorySegment(wrap(record2), 0, record2.length);
+
+		CloseableIterator<Buffer> unconsumedSegment = spanningWrapper.getUnconsumedSegment();
+
+		spanningWrapper.getInputView().readFully(new byte[recordLen], 0, recordLen); // read out from file
+		spanningWrapper.transferLeftOverTo(new NonSpanningWrapper()); // clear any leftover
+		spanningWrapper.transferFrom(wrapNonSpanning(recordBytes(recordLen), recordLen), recordLen); // overwrite with new data
+
+		assertArrayEquals(concat(record1, record2), toByteArray(unconsumedSegment));
+	}
+
+	private byte[] recordBytes(int recordLen) {
+		byte[] inputData = randomBytes(recordLen + LENGTH_BYTES);
+		for (int i = 0; i < Integer.BYTES; i++) {
+			inputData[Integer.BYTES - i - 1] = (byte) (recordLen >>> i * 8);
+		}
+		return inputData;
+	}
+
+	private NonSpanningWrapper wrapNonSpanning(byte[] bytes, int len) {
+		NonSpanningWrapper nonSpanningWrapper = new NonSpanningWrapper();
+		MemorySegment segment = wrap(bytes);
+		nonSpanningWrapper.initializeFromMemorySegment(segment, 0, len);
+		nonSpanningWrapper.readInt(); // emulate read length performed in getNextRecord to move position
+		return nonSpanningWrapper;
+	}
+
+	private byte[] toByteArray(CloseableIterator<Buffer> unconsumed) {
+		final List<Buffer> buffers = new ArrayList<>();
+		try {
+			unconsumed.forEachRemaining(buffers::add);
+			byte[] result = new byte[buffers.stream().mapToInt(Buffer::readableBytes).sum()];
+			int offset = 0;
+			for (Buffer buffer : buffers) {
+				int len = buffer.readableBytes();
+				buffer.getNioBuffer(0, len).get(result, offset, len);
+				offset += len;
+			}
+			return result;
+		} finally {
+			buffers.forEach(Buffer::recycleBuffer);
+		}
+	}
+
+	private byte[] randomBytes(int length) {
+		byte[] inputData = new byte[length];
+		random.nextBytes(inputData);
+		return inputData;
+	}
+
+	private byte[] concat(byte[] input1, byte[] input2) {
+		byte[] expected = new byte[input1.length + input2.length];
+		System.arraycopy(input1, 0, expected, 0, input1.length);
+		System.arraycopy(input2, 0, expected, input1.length, input2.length);
+		return expected;
+	}
+
+}