You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:10 UTC
[flink] 10/10: [FLINK-17547][task] Implement getUnconsumedSegment
for spilled buffers
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2ed38c12be7151ab49e9cf2b4e2d8138f1ae4c62
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 16:48:47 2020 +0200
[FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers
---
.../flink/core/memory/MemorySegmentFactory.java | 28 ++++-
.../org/apache/flink/util/CloseableIterator.java | 32 ++++++
.../core/memory/MemorySegmentFactoryTest.java | 64 ++++++++++++
.../apache/flink/util/CloseableIteratorTest.java | 82 +++++++++++++++
.../runtime/io/disk/FileBasedBufferIterator.java | 90 ++++++++++++++++
.../network/api/serialization/SpanningWrapper.java | 54 +++++++---
.../api/serialization/SpanningWrapperTest.java | 115 +++++++++++++++++++++
7 files changed, 448 insertions(+), 17 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index ee301a1..f643bc4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
*
@@ -53,6 +55,31 @@ public final class MemorySegmentFactory {
}
/**
+ * Copies the given heap memory region and creates a new memory segment wrapping it.
+ *
+ * @param bytes The heap memory region.
+ * @param start starting position, inclusive
+ * @param end end position, exclusive
+ * @return A new memory segment that targets a copy of the given heap memory region.
+ * @throws IllegalArgumentException if start > end or end > bytes.length
+ */
+ public static MemorySegment wrapCopy(byte[] bytes, int start, int end) throws IllegalArgumentException {
+ checkArgument(end >= start);
+ checkArgument(end <= bytes.length);
+ MemorySegment copy = allocateUnpooledSegment(end - start);
+ copy.put(0, bytes, start, copy.size());
+ return copy;
+ }
+
+ /**
+ * Wraps the four bytes representing the given number with a {@link MemorySegment}.
+ * @see ByteBuffer#putInt(int)
+ */
+ public static MemorySegment wrapInt(int value) {
+ return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
+ }
+
+ /**
* Allocates some unpooled memory and creates a new memory segment that represents
* that memory.
*
@@ -161,5 +188,4 @@ public final class MemorySegmentFactory {
public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
return new HybridMemorySegment(memory, null);
}
-
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
index cc51324..e0c5ec0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -24,7 +24,9 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.function.Consumer;
import static java.util.Arrays.asList;
@@ -80,6 +82,36 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
};
}
+ static <T> CloseableIterator<T> flatten(CloseableIterator<T>... iterators) {
+ return new CloseableIterator<T>() {
+ private final Queue<CloseableIterator<T>> queue = removeEmptyHead(new LinkedList<>(asList(iterators)));
+
+ private Queue<CloseableIterator<T>> removeEmptyHead(Queue<CloseableIterator<T>> queue) {
+ while (!queue.isEmpty() && !queue.peek().hasNext()) {
+ queue.poll();
+ }
+ return queue;
+ }
+
+ @Override
+ public boolean hasNext() {
+ removeEmptyHead(queue);
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public T next() {
+ removeEmptyHead(queue);
+ return queue.peek().next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ IOUtils.closeAll(iterators);
+ }
+ };
+ }
+
@SuppressWarnings("unchecked")
static <T> CloseableIterator<T> empty() {
return (CloseableIterator<T>) EMPTY_INSTANCE;
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
new file mode 100644
index 0000000..59c1d7e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import static java.lang.System.arraycopy;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link MemorySegmentFactory} test.
+ */
+public class MemorySegmentFactoryTest {
+
+ @Test
+ public void testWrapCopyChangingData() {
+ byte[] data = {1, 2, 3, 4, 5};
+ byte[] changingData = new byte[data.length];
+ arraycopy(data, 0, changingData, 0, data.length);
+ MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
+ changingData[0]++;
+ assertArrayEquals(data, segment.heapMemory);
+ }
+
+ @Test
+ public void testWrapPartialCopy() {
+ byte[] data = {1, 2, 3, 5, 6};
+ MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2);
+ byte[] exp = new byte[segment.size()];
+ arraycopy(data, 0, exp, 0, exp.length);
+ assertArrayEquals(exp, segment.heapMemory);
+ }
+
+ @Test
+ public void testWrapCopyEmpty() {
+ MemorySegmentFactory.wrapCopy(new byte[0], 0, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrapCopyWrongStart() {
+ MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 10, 3);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrapCopyWrongEnd() {
+ MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 0, 10);
+ }
+
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
new file mode 100644
index 0000000..e2d4d3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link CloseableIterator} test.
+ */
+@SuppressWarnings("unchecked")
+public class CloseableIteratorTest {
+
+ private static final String[] ELEMENTS = new String[]{"flink", "blink"};
+
+ @Test
+ public void testFlattenEmpty() throws Exception {
+ List<CloseableIterator<?>> iterators = asList(
+ CloseableIterator.flatten(),
+ CloseableIterator.flatten(CloseableIterator.empty()),
+ CloseableIterator.flatten(CloseableIterator.flatten()));
+ for (CloseableIterator<?> i : iterators) {
+ assertFalse(i.hasNext());
+ i.close();
+ }
+ }
+
+ @Test
+ public void testFlattenIteration() {
+ CloseableIterator<String> iterator = CloseableIterator.flatten(
+ CloseableIterator.ofElement(ELEMENTS[0], unused -> {
+ }),
+ CloseableIterator.ofElement(ELEMENTS[1], unused -> {
+ })
+ );
+
+ List<String> iterated = new ArrayList<>();
+ iterator.forEachRemaining(iterated::add);
+ assertArrayEquals(ELEMENTS, iterated.toArray());
+ }
+
+ @Test(expected = TestException.class)
+ public void testFlattenErrorHandling() throws Exception {
+ List<String> closed = new ArrayList<>();
+ CloseableIterator<String> iterator = CloseableIterator.flatten(
+ CloseableIterator.ofElement(ELEMENTS[0], e -> {
+ closed.add(e);
+ throw new TestException();
+ }),
+ CloseableIterator.ofElement(ELEMENTS[1], closed::add)
+ );
+ try {
+ iterator.close();
+ } finally {
+ assertArrayEquals(ELEMENTS, closed.toArray());
+ }
+ }
+
+ private static class TestException extends RuntimeException {
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
new file mode 100644
index 0000000..c7e1cd8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RefCountedFile;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link CloseableIterator} of {@link Buffer buffers} over file content.
+ */
+@Internal
+public class FileBasedBufferIterator implements CloseableIterator<Buffer> {
+
+ private final RefCountedFile file;
+ private final FileInputStream stream;
+ private final int bufferSize;
+
+ private int offset;
+ private int bytesToRead;
+
+ public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, int bufferSize) throws FileNotFoundException {
+ checkNotNull(file);
+ checkArgument(bytesToRead >= 0);
+ checkArgument(bufferSize > 0);
+ this.stream = new FileInputStream(file.getFile());
+ this.file = file;
+ this.bufferSize = bufferSize;
+ this.bytesToRead = bytesToRead;
+ file.retain();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return bytesToRead > 0;
+ }
+
+ @Override
+ public Buffer next() {
+ byte[] buffer = new byte[bufferSize];
+ int bytesRead = read(buffer);
+ checkState(bytesRead >= 0, "unexpected end of file, file = " + file.getFile() + ", offset=" + offset);
+ offset += bytesRead;
+ bytesToRead -= bytesRead;
+ return new NetworkBuffer(wrap(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
+ }
+
+ private int read(byte[] buffer) {
+ int limit = Math.min(buffer.length, bytesToRead);
+ try {
+ return stream.read(buffer, offset, limit);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeAll(stream, file::release);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 9cffde3..45d6ad7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -24,7 +24,10 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.FileBasedBufferIterator;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;
@@ -41,15 +44,18 @@ import java.util.Random;
import static java.lang.Math.max;
import static java.lang.Math.min;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopy;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapInt;
import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.CloseableIterator.empty;
import static org.apache.flink.util.FileUtils.writeCompletely;
import static org.apache.flink.util.IOUtils.closeAllQuietly;
final class SpanningWrapper {
- private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
- private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
+ private static final int DEFAULT_THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+ private static final int DEFAULT_FILE_BUFFER_SIZE = 2 * 1024 * 1024;
private final byte[] initialBuffer = new byte[1024];
@@ -61,6 +67,8 @@ final class SpanningWrapper {
final ByteBuffer lengthBuffer;
+ private final int fileBufferSize;
+
private FileChannel spillingChannel;
private byte[] buffer;
@@ -79,16 +87,21 @@ final class SpanningWrapper {
private DataInputViewStreamWrapper spillFileReader;
+ private int thresholdForSpilling;
+
SpanningWrapper(String[] tempDirs) {
- this.tempDirs = tempDirs;
+ this(tempDirs, DEFAULT_THRESHOLD_FOR_SPILLING, DEFAULT_FILE_BUFFER_SIZE);
+ }
+ SpanningWrapper(String[] tempDirectories, int threshold, int fileBufferSize) {
+ this.tempDirs = tempDirectories;
this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
this.recordLength = -1;
-
this.serializationReadBuffer = new DataInputDeserializer();
this.buffer = initialBuffer;
+ this.thresholdForSpilling = threshold;
+ this.fileBufferSize = fileBufferSize;
}
/**
@@ -101,7 +114,7 @@ final class SpanningWrapper {
}
private boolean isAboveSpillingThreshold() {
- return recordLength > THRESHOLD_FOR_SPILLING;
+ return recordLength > thresholdForSpilling;
}
void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
@@ -137,7 +150,7 @@ final class SpanningWrapper {
accumulatedRecordBytes += length;
if (hasFullRecord()) {
spillingChannel.close();
- spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE));
+ spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), fileBufferSize));
}
}
@@ -170,22 +183,26 @@ final class SpanningWrapper {
CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
if (isReadingLength()) {
- return singleBufferIterator(copyLengthBuffer());
+ return singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, lengthBuffer.position()));
} else if (isAboveSpillingThreshold()) {
- throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
+ return createSpilledDataIterator();
} else if (recordLength == -1) {
- return CloseableIterator.empty(); // no remaining partial length or data
+ return empty(); // no remaining partial length or data
} else {
return singleBufferIterator(copyDataBuffer());
}
}
- private MemorySegment copyLengthBuffer() {
- int position = lengthBuffer.position();
- MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
- lengthBuffer.position(0);
- segment.put(0, lengthBuffer, position);
- return segment;
+ @SuppressWarnings("unchecked")
+ private CloseableIterator<Buffer> createSpilledDataIterator() throws IOException {
+ if (spillingChannel != null && spillingChannel.isOpen()) {
+ spillingChannel.force(false);
+ }
+ return CloseableIterator.flatten(
+ toSingleBufferIterator(wrapInt(recordLength)),
+ new FileBasedBufferIterator(spillFile, min(accumulatedRecordBytes, recordLength), fileBufferSize),
+ leftOverData == null ? empty() : toSingleBufferIterator(wrapCopy(leftOverData.getArray(), leftOverStart, leftOverLimit))
+ );
}
private MemorySegment copyDataBuffer() throws IOException {
@@ -289,4 +306,9 @@ final class SpanningWrapper {
return lengthBuffer.position() > 0;
}
+ private static CloseableIterator<Buffer> toSingleBufferIterator(MemorySegment segment) {
+ NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
+ return CloseableIterator.ofElement(buffer, Buffer::recycleBuffer);
+ }
+
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
new file mode 100644
index 0000000..be57e21
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link SpanningWrapper} test.
+ */
+public class SpanningWrapperTest {
+
+ private static final Random random = new Random();
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void testLargeUnconsumedSegment() throws Exception {
+ int recordLen = 100;
+ int firstChunk = (int) (recordLen * .9);
+ int spillingThreshold = (int) (firstChunk * .9);
+
+ byte[] record1 = recordBytes(recordLen);
+ byte[] record2 = recordBytes(recordLen * 2);
+
+ SpanningWrapper spanningWrapper = new SpanningWrapper(new String[]{folder.newFolder().getAbsolutePath()}, spillingThreshold, recordLen);
+ spanningWrapper.transferFrom(wrapNonSpanning(record1, firstChunk), recordLen);
+ spanningWrapper.addNextChunkFromMemorySegment(wrap(record1), firstChunk, recordLen - firstChunk + LENGTH_BYTES);
+ spanningWrapper.addNextChunkFromMemorySegment(wrap(record2), 0, record2.length);
+
+ CloseableIterator<Buffer> unconsumedSegment = spanningWrapper.getUnconsumedSegment();
+
+ spanningWrapper.getInputView().readFully(new byte[recordLen], 0, recordLen); // read out from file
+ spanningWrapper.transferLeftOverTo(new NonSpanningWrapper()); // clear any leftover
+ spanningWrapper.transferFrom(wrapNonSpanning(recordBytes(recordLen), recordLen), recordLen); // overwrite with new data
+
+ assertArrayEquals(concat(record1, record2), toByteArray(unconsumedSegment));
+ }
+
+ private byte[] recordBytes(int recordLen) {
+ byte[] inputData = randomBytes(recordLen + LENGTH_BYTES);
+ for (int i = 0; i < Integer.BYTES; i++) {
+ inputData[Integer.BYTES - i - 1] = (byte) (recordLen >>> i * 8);
+ }
+ return inputData;
+ }
+
+ private NonSpanningWrapper wrapNonSpanning(byte[] bytes, int len) {
+ NonSpanningWrapper nonSpanningWrapper = new NonSpanningWrapper();
+ MemorySegment segment = wrap(bytes);
+ nonSpanningWrapper.initializeFromMemorySegment(segment, 0, len);
+ nonSpanningWrapper.readInt(); // emulate read length performed in getNextRecord to move position
+ return nonSpanningWrapper;
+ }
+
+ private byte[] toByteArray(CloseableIterator<Buffer> unconsumed) {
+ final List<Buffer> buffers = new ArrayList<>();
+ try {
+ unconsumed.forEachRemaining(buffers::add);
+ byte[] result = new byte[buffers.stream().mapToInt(Buffer::readableBytes).sum()];
+ int offset = 0;
+ for (Buffer buffer : buffers) {
+ int len = buffer.readableBytes();
+ buffer.getNioBuffer(0, len).get(result, offset, len);
+ offset += len;
+ }
+ return result;
+ } finally {
+ buffers.forEach(Buffer::recycleBuffer);
+ }
+ }
+
+ private byte[] randomBytes(int length) {
+ byte[] inputData = new byte[length];
+ random.nextBytes(inputData);
+ return inputData;
+ }
+
+ private byte[] concat(byte[] input1, byte[] input2) {
+ byte[] expected = new byte[input1.length + input2.length];
+ System.arraycopy(input1, 0, expected, 0, input1.length);
+ System.arraycopy(input2, 0, expected, input1.length, input2.length);
+ return expected;
+ }
+
+}