You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:00 UTC

[flink] branch release-1.11 updated (4fba374 -> 2ed38c1)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4fba374  [FLINK-17763][dist] Properly handle log properties and spaces in scala-shell.sh
     new 9fedade  [FLINK-17547][task][hotfix] Improve error handling 1 catch one more invalid input in DataOutputSerializer.write 2 more informative error messages
     new 39f5f1b  [FLINK-17547][task][hotfix] Extract NonSpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static inner class) As it is, no logical changes.
     new 5fd01ea  [FLINK-17547][task][hotfix] Extract SpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static inner class). As it is, no logical changes.
     new c6bdeb4  [FLINK-17547][task][hotfix] Fix compiler warnings in NonSpanningWrapper
     new 9ebdaf4  [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer
     new 1c9bf03  [FLINK-17547][task] Use iterator for unconsumed buffers. Motivation: support spilled records Changes: 1. change SpillingAdaptiveSpanningRecordDeserializer.getUnconsumedBuffer signature 2. adapt channel state persistence to new types
     new 4e323c3  [FLINK-17547][task][hotfix] Extract RefCountedFileWithStream from RefCountedFile Motivation: use RefCountedFile for reading as well.
     new 1379548  [FLINK-17547][task][hotfix] Move RefCountedFile to flink-core to use it in SpanningWrapper
     new 3dacffe  [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge with next?)
     new 2ed38c1  [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/core/fs}/RefCountedFile.java  |  61 +-
 .../flink/core/memory/DataOutputSerializer.java    |   4 +-
 .../flink/core/memory/HybridMemorySegment.java     |   3 +-
 .../flink/core/memory/MemorySegmentFactory.java    |  28 +-
 .../org/apache/flink/util/CloseableIterator.java   | 109 +++-
 .../main/java/org/apache/flink/util/IOUtils.java   |  16 +
 .../java/org/apache/flink/util}/RefCounted.java    |   2 +-
 .../apache/flink/core/fs}/RefCountedFileTest.java  |  61 +-
 .../core/memory/MemorySegmentFactoryTest.java      |  64 ++
 .../apache/flink/util/CloseableIteratorTest.java   |  82 +++
 .../flink/fs/s3/common/FlinkS3FileSystem.java      |   4 +-
 .../utils/RefCountedBufferingFileStream.java       |  10 +-
 .../s3/common/utils/RefCountedFSOutputStream.java  |   1 +
 ...ntedFile.java => RefCountedFileWithStream.java} |  69 +--
 .../s3/common/utils/RefCountedTmpFileCreator.java  |  10 +-
 .../writer/S3RecoverableFsDataOutputStream.java    |  12 +-
 .../S3RecoverableMultipartUploadFactory.java       |   8 +-
 .../fs/s3/common/writer/S3RecoverableWriter.java   |   8 +-
 .../utils/RefCountedBufferingFileStreamTest.java   |   4 +-
 ...Test.java => RefCountedFileWithStreamTest.java} |  60 +-
 .../writer/RecoverableMultiPartUploadImplTest.java |   4 +-
 .../S3RecoverableFsDataOutputStreamTest.java       |  10 +-
 .../channel/ChannelStateWriteRequest.java          |  33 +-
 .../ChannelStateWriteRequestDispatcherImpl.java    |   6 +-
 .../ChannelStateWriteRequestExecutorImpl.java      |  20 +-
 .../checkpoint/channel/ChannelStateWriter.java     |   6 +-
 .../checkpoint/channel/ChannelStateWriterImpl.java |  17 +-
 .../runtime/io/disk/FileBasedBufferIterator.java   |  90 +++
 .../api/serialization/NonSpanningWrapper.java      | 372 ++++++++++++
 .../api/serialization/RecordDeserializer.java      |   4 +-
 .../network/api/serialization/SpanningWrapper.java | 314 ++++++++++
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 661 ++-------------------
 .../partition/consumer/RemoteInputChannel.java     |   3 +-
 .../ChannelStateWriteRequestDispatcherTest.java    |  10 +-
 .../ChannelStateWriteRequestExecutorImplTest.java  |   1 -
 .../channel/ChannelStateWriterImplTest.java        |  13 +-
 .../channel/CheckpointInProgressRequestTest.java   |   7 +-
 .../checkpoint/channel/MockChannelStateWriter.java |  11 +-
 .../channel/RecordingChannelStateWriter.java       |  12 +-
 .../SpanningRecordSerializationTest.java           |   9 +-
 .../api/serialization/SpanningWrapperTest.java     | 115 ++++
 .../partition/consumer/SingleInputGateTest.java    |  14 +-
 .../runtime/state/ChannelPersistenceITCase.java    |   3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java     |   3 +-
 .../runtime/io/StreamTaskNetworkInput.java         |  11 +-
 45 files changed, 1428 insertions(+), 937 deletions(-)
 copy {flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils => flink-core/src/main/java/org/apache/flink/core/fs}/RefCountedFile.java (60%)
 rename {flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils => flink-core/src/main/java/org/apache/flink/util}/RefCounted.java (96%)
 copy {flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils => flink-core/src/test/java/org/apache/flink/core/fs}/RefCountedFileTest.java (56%)
 create mode 100644 flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
 create mode 100644 flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
 rename flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/{RefCountedFile.java => RefCountedFileWithStream.java} (60%)
 rename flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/{RefCountedFileTest.java => RefCountedFileWithStreamTest.java} (56%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java


[flink] 10/10: [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ed38c12be7151ab49e9cf2b4e2d8138f1ae4c62
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 16:48:47 2020 +0200

    [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers
---
 .../flink/core/memory/MemorySegmentFactory.java    |  28 ++++-
 .../org/apache/flink/util/CloseableIterator.java   |  32 ++++++
 .../core/memory/MemorySegmentFactoryTest.java      |  64 ++++++++++++
 .../apache/flink/util/CloseableIteratorTest.java   |  82 +++++++++++++++
 .../runtime/io/disk/FileBasedBufferIterator.java   |  90 ++++++++++++++++
 .../network/api/serialization/SpanningWrapper.java |  54 +++++++---
 .../api/serialization/SpanningWrapperTest.java     | 115 +++++++++++++++++++++
 7 files changed, 448 insertions(+), 17 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index ee301a1..f643bc4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
  *
@@ -53,6 +55,31 @@ public final class MemorySegmentFactory {
 	}
 
 	/**
+	 * Copies the given heap memory region and creates a new memory segment wrapping it.
+	 *
+	 * @param bytes The heap memory region.
+	 * @param start starting position, inclusive
+	 * @param end end position, exclusive
+	 * @return A new memory segment that targets a copy of the given heap memory region.
+	 * @throws IllegalArgumentException if start > end or end > bytes.length
+	 */
+	public static MemorySegment wrapCopy(byte[] bytes, int start, int end) throws IllegalArgumentException {
+		checkArgument(end >= start);
+		checkArgument(end <= bytes.length);
+		MemorySegment copy = allocateUnpooledSegment(end - start);
+		copy.put(0, bytes, start, copy.size());
+		return copy;
+	}
+
+	/**
+	 * Wraps the four bytes representing the given number with a {@link MemorySegment}.
+	 * @see ByteBuffer#putInt(int)
+	 */
+	public static MemorySegment wrapInt(int value) {
+		return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
+	}
+
+	/**
 	 * Allocates some unpooled memory and creates a new memory segment that represents
 	 * that memory.
 	 *
@@ -161,5 +188,4 @@ public final class MemorySegmentFactory {
 	public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
 		return new HybridMemorySegment(memory, null);
 	}
-
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
index cc51324..e0c5ec0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -24,7 +24,9 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
@@ -80,6 +82,36 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
 		};
 	}
 
+	static <T> CloseableIterator<T> flatten(CloseableIterator<T>... iterators) {
+		return new CloseableIterator<T>() {
+			private final Queue<CloseableIterator<T>> queue = removeEmptyHead(new LinkedList<>(asList(iterators)));
+
+			private Queue<CloseableIterator<T>> removeEmptyHead(Queue<CloseableIterator<T>> queue) {
+				while (!queue.isEmpty() && !queue.peek().hasNext()) {
+					queue.poll();
+				}
+				return queue;
+			}
+
+			@Override
+			public boolean hasNext() {
+				removeEmptyHead(queue);
+				return !queue.isEmpty();
+			}
+
+			@Override
+			public T next() {
+				removeEmptyHead(queue);
+				return queue.peek().next();
+			}
+
+			@Override
+			public void close() throws Exception {
+				IOUtils.closeAll(iterators);
+			}
+		};
+	}
+
 	@SuppressWarnings("unchecked")
 	static <T> CloseableIterator<T> empty() {
 		return (CloseableIterator<T>) EMPTY_INSTANCE;
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
new file mode 100644
index 0000000..59c1d7e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import static java.lang.System.arraycopy;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link MemorySegmentFactory} test.
+ */
+public class MemorySegmentFactoryTest {
+
+	@Test
+	public void testWrapCopyChangingData() {
+		byte[] data = {1, 2, 3, 4, 5};
+		byte[] changingData = new byte[data.length];
+		arraycopy(data, 0, changingData, 0, data.length);
+		MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
+		changingData[0]++;
+		assertArrayEquals(data, segment.heapMemory);
+	}
+
+	@Test
+	public void testWrapPartialCopy() {
+		byte[] data = {1, 2, 3, 5, 6};
+		MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2);
+		byte[] exp = new byte[segment.size()];
+		arraycopy(data, 0, exp, 0, exp.length);
+		assertArrayEquals(exp, segment.heapMemory);
+	}
+
+	@Test
+	public void testWrapCopyEmpty() {
+		MemorySegmentFactory.wrapCopy(new byte[0], 0, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapCopyWrongStart() {
+		MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 10, 3);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapCopyWrongEnd() {
+		MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 0, 10);
+	}
+
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
new file mode 100644
index 0000000..e2d4d3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link CloseableIterator} test.
+ */
+@SuppressWarnings("unchecked")
+public class CloseableIteratorTest {
+
+	private static final String[] ELEMENTS = new String[]{"flink", "blink"};
+
+	@Test
+	public void testFlattenEmpty() throws Exception {
+		List<CloseableIterator<?>> iterators = asList(
+				CloseableIterator.flatten(),
+				CloseableIterator.flatten(CloseableIterator.empty()),
+				CloseableIterator.flatten(CloseableIterator.flatten()));
+		for (CloseableIterator<?> i : iterators) {
+			assertFalse(i.hasNext());
+			i.close();
+		}
+	}
+
+	@Test
+	public void testFlattenIteration() {
+		CloseableIterator<String> iterator = CloseableIterator.flatten(
+				CloseableIterator.ofElement(ELEMENTS[0], unused -> {
+				}),
+				CloseableIterator.ofElement(ELEMENTS[1], unused -> {
+				})
+		);
+
+		List<String> iterated = new ArrayList<>();
+		iterator.forEachRemaining(iterated::add);
+		assertArrayEquals(ELEMENTS, iterated.toArray());
+	}
+
+	@Test(expected = TestException.class)
+	public void testFlattenErrorHandling() throws Exception {
+		List<String> closed = new ArrayList<>();
+		CloseableIterator<String> iterator = CloseableIterator.flatten(
+				CloseableIterator.ofElement(ELEMENTS[0], e -> {
+					closed.add(e);
+					throw new TestException();
+				}),
+				CloseableIterator.ofElement(ELEMENTS[1], closed::add)
+		);
+		try {
+			iterator.close();
+		} finally {
+			assertArrayEquals(ELEMENTS, closed.toArray());
+		}
+	}
+
+	private static class TestException extends RuntimeException {
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
new file mode 100644
index 0000000..c7e1cd8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RefCountedFile;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link CloseableIterator} of {@link Buffer buffers} over file content.
+ */
+@Internal
+public class FileBasedBufferIterator implements CloseableIterator<Buffer> {
+
+	private final RefCountedFile file;
+	private final FileInputStream stream;
+	private final int bufferSize;
+
+	private int offset;
+	private int bytesToRead;
+
+	public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, int bufferSize) throws FileNotFoundException {
+		checkNotNull(file);
+		checkArgument(bytesToRead >= 0);
+		checkArgument(bufferSize > 0);
+		this.stream = new FileInputStream(file.getFile());
+		this.file = file;
+		this.bufferSize = bufferSize;
+		this.bytesToRead = bytesToRead;
+		file.retain();
+	}
+
+	@Override
+	public boolean hasNext() {
+		return bytesToRead > 0;
+	}
+
+	@Override
+	public Buffer next() {
+		byte[] buffer = new byte[bufferSize];
+		int bytesRead = read(buffer);
+		checkState(bytesRead >= 0, "unexpected end of file, file = " + file.getFile() + ", offset=" + offset);
+		offset += bytesRead;
+		bytesToRead -= bytesRead;
+		return new NetworkBuffer(wrap(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
+	}
+
+	private int read(byte[] buffer) {
+		int limit = Math.min(buffer.length, bytesToRead);
+		try {
+			return stream.read(buffer, offset, limit);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		closeAll(stream, file::release);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 9cffde3..45d6ad7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -24,7 +24,10 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.FileBasedBufferIterator;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.StringUtils;
 
@@ -41,15 +44,18 @@ import java.util.Random;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopy;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapInt;
 import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.CloseableIterator.empty;
 import static org.apache.flink.util.FileUtils.writeCompletely;
 import static org.apache.flink.util.IOUtils.closeAllQuietly;
 
 final class SpanningWrapper {
 
-	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
-	private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
+	private static final int DEFAULT_THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+	private static final int DEFAULT_FILE_BUFFER_SIZE = 2 * 1024 * 1024;
 
 	private final byte[] initialBuffer = new byte[1024];
 
@@ -61,6 +67,8 @@ final class SpanningWrapper {
 
 	final ByteBuffer lengthBuffer;
 
+	private final int fileBufferSize;
+
 	private FileChannel spillingChannel;
 
 	private byte[] buffer;
@@ -79,16 +87,21 @@ final class SpanningWrapper {
 
 	private DataInputViewStreamWrapper spillFileReader;
 
+	private int thresholdForSpilling;
+
 	SpanningWrapper(String[] tempDirs) {
-		this.tempDirs = tempDirs;
+		this(tempDirs, DEFAULT_THRESHOLD_FOR_SPILLING, DEFAULT_FILE_BUFFER_SIZE);
+	}
 
+	SpanningWrapper(String[] tempDirectories, int threshold, int fileBufferSize) {
+		this.tempDirs = tempDirectories;
 		this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
 		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
 		this.recordLength = -1;
-
 		this.serializationReadBuffer = new DataInputDeserializer();
 		this.buffer = initialBuffer;
+		this.thresholdForSpilling = threshold;
+		this.fileBufferSize = fileBufferSize;
 	}
 
 	/**
@@ -101,7 +114,7 @@ final class SpanningWrapper {
 	}
 
 	private boolean isAboveSpillingThreshold() {
-		return recordLength > THRESHOLD_FOR_SPILLING;
+		return recordLength > thresholdForSpilling;
 	}
 
 	void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
@@ -137,7 +150,7 @@ final class SpanningWrapper {
 		accumulatedRecordBytes += length;
 		if (hasFullRecord()) {
 			spillingChannel.close();
-			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE));
+			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), fileBufferSize));
 		}
 	}
 
@@ -170,22 +183,26 @@ final class SpanningWrapper {
 
 	CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
 		if (isReadingLength()) {
-			return singleBufferIterator(copyLengthBuffer());
+			return singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, lengthBuffer.position()));
 		} else if (isAboveSpillingThreshold()) {
-			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
+			return createSpilledDataIterator();
 		} else if (recordLength == -1) {
-			return CloseableIterator.empty(); // no remaining partial length or data
+			return empty(); // no remaining partial length or data
 		} else {
 			return singleBufferIterator(copyDataBuffer());
 		}
 	}
 
-	private MemorySegment copyLengthBuffer() {
-		int position = lengthBuffer.position();
-		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
-		lengthBuffer.position(0);
-		segment.put(0, lengthBuffer, position);
-		return segment;
+	@SuppressWarnings("unchecked")
+	private CloseableIterator<Buffer> createSpilledDataIterator() throws IOException {
+		if (spillingChannel != null && spillingChannel.isOpen()) {
+			spillingChannel.force(false);
+		}
+		return CloseableIterator.flatten(
+			toSingleBufferIterator(wrapInt(recordLength)),
+			new FileBasedBufferIterator(spillFile, min(accumulatedRecordBytes, recordLength), fileBufferSize),
+			leftOverData == null ? empty() : toSingleBufferIterator(wrapCopy(leftOverData.getArray(), leftOverStart, leftOverLimit))
+		);
 	}
 
 	private MemorySegment copyDataBuffer() throws IOException {
@@ -289,4 +306,9 @@ final class SpanningWrapper {
 		return lengthBuffer.position() > 0;
 	}
 
+	private static CloseableIterator<Buffer> toSingleBufferIterator(MemorySegment segment) {
+		NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
+		return CloseableIterator.ofElement(buffer, Buffer::recycleBuffer);
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
new file mode 100644
index 0000000..be57e21
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link SpanningWrapper} test.
+ */
+public class SpanningWrapperTest {
+
+	private static final Random random = new Random();
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	@Test
+	public void testLargeUnconsumedSegment() throws Exception {
+		int recordLen = 100;
+		int firstChunk = (int) (recordLen * .9);
+		int spillingThreshold = (int) (firstChunk * .9);
+
+		byte[] record1 = recordBytes(recordLen);
+		byte[] record2 = recordBytes(recordLen * 2);
+
+		SpanningWrapper spanningWrapper = new SpanningWrapper(new String[]{folder.newFolder().getAbsolutePath()}, spillingThreshold, recordLen);
+		spanningWrapper.transferFrom(wrapNonSpanning(record1, firstChunk), recordLen);
+		spanningWrapper.addNextChunkFromMemorySegment(wrap(record1), firstChunk, recordLen - firstChunk + LENGTH_BYTES);
+		spanningWrapper.addNextChunkFromMemorySegment(wrap(record2), 0, record2.length);
+
+		CloseableIterator<Buffer> unconsumedSegment = spanningWrapper.getUnconsumedSegment();
+
+		spanningWrapper.getInputView().readFully(new byte[recordLen], 0, recordLen); // read out from file
+		spanningWrapper.transferLeftOverTo(new NonSpanningWrapper()); // clear any leftover
+		spanningWrapper.transferFrom(wrapNonSpanning(recordBytes(recordLen), recordLen), recordLen); // overwrite with new data
+
+		assertArrayEquals(concat(record1, record2), toByteArray(unconsumedSegment));
+	}
+
+	private byte[] recordBytes(int recordLen) {
+		byte[] inputData = randomBytes(recordLen + LENGTH_BYTES);
+		for (int i = 0; i < Integer.BYTES; i++) {
+			inputData[Integer.BYTES - i - 1] = (byte) (recordLen >>> i * 8);
+		}
+		return inputData;
+	}
+
+	private NonSpanningWrapper wrapNonSpanning(byte[] bytes, int len) {
+		NonSpanningWrapper nonSpanningWrapper = new NonSpanningWrapper();
+		MemorySegment segment = wrap(bytes);
+		nonSpanningWrapper.initializeFromMemorySegment(segment, 0, len);
+		nonSpanningWrapper.readInt(); // emulate read length performed in getNextRecord to move position
+		return nonSpanningWrapper;
+	}
+
+	private byte[] toByteArray(CloseableIterator<Buffer> unconsumed) {
+		final List<Buffer> buffers = new ArrayList<>();
+		try {
+			unconsumed.forEachRemaining(buffers::add);
+			byte[] result = new byte[buffers.stream().mapToInt(Buffer::readableBytes).sum()];
+			int offset = 0;
+			for (Buffer buffer : buffers) {
+				int len = buffer.readableBytes();
+				buffer.getNioBuffer(0, len).get(result, offset, len);
+				offset += len;
+			}
+			return result;
+		} finally {
+			buffers.forEach(Buffer::recycleBuffer);
+		}
+	}
+
+	private byte[] randomBytes(int length) {
+		byte[] inputData = new byte[length];
+		random.nextBytes(inputData);
+		return inputData;
+	}
+
+	private byte[] concat(byte[] input1, byte[] input2) {
+		byte[] expected = new byte[input1.length + input2.length];
+		System.arraycopy(input1, 0, expected, 0, input1.length);
+		System.arraycopy(input2, 0, expected, input1.length, input2.length);
+		return expected;
+	}
+
+}


[flink] 05/10: [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ebdaf40973e1b7ff6aaadd19d1d6cda1d3e69d8
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue May 12 11:24:01 2020 +0200

    [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer
---
 .../main/java/org/apache/flink/util/IOUtils.java   |   9 +
 .../api/serialization/NonSpanningWrapper.java      |  81 +++++-
 .../network/api/serialization/SpanningWrapper.java | 278 ++++++++++-----------
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 121 ++++-----
 4 files changed, 271 insertions(+), 218 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 02b11e6..1f9af18 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.Socket;
 
+import static java.util.Arrays.asList;
+
 /**
  * An utility class for I/O related functionality.
  */
@@ -244,6 +246,13 @@ public final class IOUtils {
 	/**
 	 * Closes all elements in the iterable with closeQuietly().
 	 */
+	public static void closeAllQuietly(AutoCloseable... closeables) {
+		closeAllQuietly(asList(closeables));
+	}
+
+	/**
+	 * Closes all elements in the iterable with closeQuietly().
+	 */
 	public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {
 		if (null != closeables) {
 			for (AutoCloseable closeable : closeables) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 6d9602f..5de5467 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -17,27 +17,43 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+
 final class NonSpanningWrapper implements DataInputView {
 
-	MemorySegment segment;
+	private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
+			"Serializer consumed more bytes than the record had. " +
+					"This indicates broken serialization. If you are using custom serialization types " +
+					"(Value or Writable), check their serialization methods. If you are using a " +
+					"Kryo-serialized type, check the corresponding Kryo serializer.";
+
+	private MemorySegment segment;
 
 	private int limit;
 
-	int position;
+	private int position;
 
 	private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
 	private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
 
-	int remaining() {
+	private final NextRecordResponse reusedNextRecordResponse = new NextRecordResponse(null, 0); // performance impact of immutable objects not benchmarked
+
+	private int remaining() {
 		return this.limit - this.position;
 	}
 
@@ -47,14 +63,14 @@ final class NonSpanningWrapper implements DataInputView {
 		this.position = 0;
 	}
 
-	void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+	void initializeFromMemorySegment(MemorySegment seg, int position, int limit) {
 		this.segment = seg;
 		this.position = position;
-		this.limit = leftOverLimit;
+		this.limit = limit;
 	}
 
 	Optional<MemorySegment> getUnconsumedSegment() {
-		if (remaining() == 0) {
+		if (!hasRemaining()) {
 			return Optional.empty();
 		}
 		MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
@@ -62,6 +78,10 @@ final class NonSpanningWrapper implements DataInputView {
 		return Optional.of(target);
 	}
 
+	boolean hasRemaining() {
+		return remaining() > 0;
+	}
+
 	// -------------------------------------------------------------------------------------------------------------
 	//                                       DataInput specific methods
 	// -------------------------------------------------------------------------------------------------------------
@@ -290,4 +310,53 @@ final class NonSpanningWrapper implements DataInputView {
 	public int read(byte[] b) {
 		return read(b, 0, b.length);
 	}
+
+	ByteBuffer wrapIntoByteBuffer() {
+		return segment.wrap(position, remaining());
+	}
+
+	int copyContentTo(byte[] dst) {
+		final int numBytesChunk = remaining();
+		segment.get(position, dst, 0, numBytesChunk);
+		return numBytesChunk;
+	}
+
+	/**
+	 * Copies the data and transfers the "ownership" (i.e. clears current wrapper).
+	 */
+	void transferTo(ByteBuffer dst) {
+		segment.get(position, dst, remaining());
+		clear();
+	}
+
+	NextRecordResponse getNextRecord(IOReadableWritable target) throws IOException {
+		int recordLen = readInt();
+		if (canReadRecord(recordLen)) {
+			return readInto(target);
+		} else {
+			return reusedNextRecordResponse.updated(PARTIAL_RECORD, recordLen);
+		}
+	}
+
+	private NextRecordResponse readInto(IOReadableWritable target) throws IOException {
+		try {
+			target.read(this);
+		} catch (IndexOutOfBoundsException e) {
+			throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
+		}
+		int remaining = remaining();
+		if (remaining < 0) {
+			throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new IndexOutOfBoundsException("Remaining = " + remaining));
+		}
+		return reusedNextRecordResponse.updated(remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining);
+	}
+
+	boolean hasCompleteLength() {
+		return remaining() >= LENGTH_BYTES;
+	}
+
+	private boolean canReadRecord(int recordLength) {
+		return recordLength <= remaining();
+	}
+
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index e59363f..430f0db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -38,9 +37,16 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.Random;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.FileUtils.writeCompletely;
+import static org.apache.flink.util.IOUtils.closeAllQuietly;
+
 final class SpanningWrapper {
 
 	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+	private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
 
 	private final byte[] initialBuffer = new byte[1024];
 
@@ -50,7 +56,7 @@ final class SpanningWrapper {
 
 	private final DataInputDeserializer serializationReadBuffer;
 
-	private final ByteBuffer lengthBuffer;
+	final ByteBuffer lengthBuffer;
 
 	private FileChannel spillingChannel;
 
@@ -70,10 +76,10 @@ final class SpanningWrapper {
 
 	private DataInputViewStreamWrapper spillFileReader;
 
-	public SpanningWrapper(String[] tempDirs) {
+	SpanningWrapper(String[] tempDirs) {
 		this.tempDirs = tempDirs;
 
-		this.lengthBuffer = ByteBuffer.allocate(4);
+		this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
 		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
 
 		this.recordLength = -1;
@@ -82,187 +88,161 @@ final class SpanningWrapper {
 		this.buffer = initialBuffer;
 	}
 
-	void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
-		// set the length and copy what is available to the buffer
-		this.recordLength = nextRecordLength;
-
-		final int numBytesChunk = partial.remaining();
-
-		if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-			// create a spilling channel and put the data there
-			this.spillingChannel = createSpillingChannel();
-
-			ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
-			FileUtils.writeCompletely(this.spillingChannel, toWrite);
-		}
-		else {
-			// collect in memory
-			ensureBufferCapacity(nextRecordLength);
-			partial.segment.get(partial.position, buffer, 0, numBytesChunk);
-		}
-
-		this.accumulatedRecordBytes = numBytesChunk;
+	/**
+	 * Copies the data and transfers the "ownership" (i.e. clears the passed wrapper).
+	 */
+	void transferFrom(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
+		updateLength(nextRecordLength);
+		accumulatedRecordBytes = isAboveSpillingThreshold() ? spill(partial) : partial.copyContentTo(buffer);
+		partial.clear();
 	}
 
-	void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
-		// copy what we have to the length buffer
-		partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
+	private boolean isAboveSpillingThreshold() {
+		return recordLength > THRESHOLD_FOR_SPILLING;
 	}
 
 	void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
-		int segmentPosition = offset;
-		int segmentRemaining = numBytes;
-		// check where to go. if we have a partial length, we need to complete it first
-		if (this.lengthBuffer.position() > 0) {
-			int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
-			segment.get(segmentPosition, this.lengthBuffer, toPut);
-			// did we complete the length?
-			if (this.lengthBuffer.hasRemaining()) {
-				return;
-			} else {
-				this.recordLength = this.lengthBuffer.getInt(0);
-
-				this.lengthBuffer.clear();
-				segmentPosition += toPut;
-				segmentRemaining -= toPut;
-				if (this.recordLength > THRESHOLD_FOR_SPILLING) {
-					this.spillingChannel = createSpillingChannel();
-				} else {
-					ensureBufferCapacity(this.recordLength);
-				}
-			}
+		int limit = offset + numBytes;
+		int numBytesRead = isReadingLength() ? readLength(segment, offset, numBytes) : 0;
+		offset += numBytesRead;
+		numBytes -= numBytesRead;
+		if (numBytes == 0) {
+			return;
 		}
 
-		// copy as much as we need or can for this next spanning record
-		int needed = this.recordLength - this.accumulatedRecordBytes;
-		int toCopy = Math.min(needed, segmentRemaining);
+		int toCopy = min(recordLength - accumulatedRecordBytes, numBytes);
+		if (toCopy > 0) {
+			copyFromSegment(segment, offset, toCopy);
+		}
+		if (numBytes > toCopy) {
+			leftOverData = segment;
+			leftOverStart = offset + toCopy;
+			leftOverLimit = limit;
+		}
+	}
 
-		if (spillingChannel != null) {
-			// spill to file
-			ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
-			FileUtils.writeCompletely(this.spillingChannel, toWrite);
+	private void copyFromSegment(MemorySegment segment, int offset, int length) throws IOException {
+		if (spillingChannel == null) {
+			copyIntoBuffer(segment, offset, length);
 		} else {
-			segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
+			copyIntoFile(segment, offset, length);
 		}
+	}
 
-		this.accumulatedRecordBytes += toCopy;
-
-		if (toCopy < segmentRemaining) {
-			// there is more data in the segment
-			this.leftOverData = segment;
-			this.leftOverStart = segmentPosition + toCopy;
-			this.leftOverLimit = numBytes + offset;
+	private void copyIntoFile(MemorySegment segment, int offset, int length) throws IOException {
+		writeCompletely(spillingChannel, segment.wrap(offset, length));
+		accumulatedRecordBytes += length;
+		if (hasFullRecord()) {
+			spillingChannel.close();
+			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
 		}
+	}
 
-		if (accumulatedRecordBytes == recordLength) {
-			// we have the full record
-			if (spillingChannel == null) {
-				this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
-			}
-			else {
-				spillingChannel.close();
+	private void copyIntoBuffer(MemorySegment segment, int offset, int length) {
+		segment.get(offset, buffer, accumulatedRecordBytes, length);
+		accumulatedRecordBytes += length;
+		if (hasFullRecord()) {
+			serializationReadBuffer.setBuffer(buffer, 0, recordLength);
+		}
+	}
 
-				BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
-				this.spillFileReader = new DataInputViewStreamWrapper(inStream);
-			}
+	private int readLength(MemorySegment segment, int segmentPosition, int segmentRemaining) throws IOException {
+		int bytesToRead = min(lengthBuffer.remaining(), segmentRemaining);
+		segment.get(segmentPosition, lengthBuffer, bytesToRead);
+		if (!lengthBuffer.hasRemaining()) {
+			updateLength(lengthBuffer.getInt(0));
 		}
+		return bytesToRead;
 	}
 
-	Optional<MemorySegment> getUnconsumedSegment() throws IOException {
-		// for the case of only partial length, no data
-		final int position = lengthBuffer.position();
-		if (position > 0) {
-			MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
-			lengthBuffer.position(0);
-			segment.put(0, lengthBuffer, position);
-			return Optional.of(segment);
+	private void updateLength(int length) throws IOException {
+		lengthBuffer.clear();
+		recordLength = length;
+		if (isAboveSpillingThreshold()) {
+			spillingChannel = createSpillingChannel();
+		} else {
+			ensureBufferCapacity(length);
 		}
+	}
 
-		// for the case of full length, partial data in buffer
-		if (recordLength > THRESHOLD_FOR_SPILLING) {
-			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled " +
-				"records.");
-		} else if (recordLength != -1) {
-			int leftOverSize = leftOverLimit - leftOverStart;
-			int unconsumedSize = Integer.BYTES + accumulatedRecordBytes + leftOverSize;
-			DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
-			serializer.writeInt(recordLength);
-			serializer.write(buffer, 0, accumulatedRecordBytes);
-			if (leftOverData != null) {
-				serializer.write(leftOverData, leftOverStart, leftOverSize);
-			}
-			MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
-			segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
-			return Optional.of(segment);
+	Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+		if (isReadingLength()) {
+			return Optional.of(copyLengthBuffer());
+		} else if (isAboveSpillingThreshold()) {
+			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
+		} else if (recordLength == -1) {
+			return Optional.empty(); // no remaining partial length or data
+		} else {
+			return Optional.of(copyDataBuffer());
 		}
+	}
 
-		// for the case of no remaining partial length or data
-		return Optional.empty();
+	private MemorySegment copyLengthBuffer() {
+		int position = lengthBuffer.position();
+		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
+		lengthBuffer.position(0);
+		segment.put(0, lengthBuffer, position);
+		return segment;
 	}
 
-	void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
-		deserializer.clear();
+	private MemorySegment copyDataBuffer() throws IOException {
+		int leftOverSize = leftOverLimit - leftOverStart;
+		int unconsumedSize = LENGTH_BYTES + accumulatedRecordBytes + leftOverSize;
+		DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
+		serializer.writeInt(recordLength);
+		serializer.write(buffer, 0, accumulatedRecordBytes);
+		if (leftOverData != null) {
+			serializer.write(leftOverData, leftOverStart, leftOverSize);
+		}
+		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+		segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
+		return segment;
+	}
 
+	/**
+	 * Copies the leftover data and transfers the "ownership" (i.e. clears this wrapper).
+	 */
+	void transferLeftOverTo(NonSpanningWrapper nonSpanningWrapper) {
+		nonSpanningWrapper.clear();
 		if (leftOverData != null) {
-			deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
+			nonSpanningWrapper.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
 		}
+		clear();
 	}
 
 	boolean hasFullRecord() {
-		return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
+		return recordLength >= 0 && accumulatedRecordBytes >= recordLength;
 	}
 
 	int getNumGatheredBytes() {
-		return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
+		return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position());
 	}
 
+	@SuppressWarnings("ResultOfMethodCallIgnored")
 	public void clear() {
-		this.buffer = initialBuffer;
-		this.serializationReadBuffer.releaseArrays();
-
-		this.recordLength = -1;
-		this.lengthBuffer.clear();
-		this.leftOverData = null;
-		this.leftOverStart = 0;
-		this.leftOverLimit = 0;
-		this.accumulatedRecordBytes = 0;
-
-		if (spillingChannel != null) {
-			try {
-				spillingChannel.close();
-			}
-			catch (Throwable t) {
-				// ignore
-			}
-			spillingChannel = null;
-		}
-		if (spillFileReader != null) {
-			try {
-				spillFileReader.close();
-			}
-			catch (Throwable t) {
-				// ignore
-			}
-			spillFileReader = null;
-		}
-		if (spillFile != null) {
-			spillFile.delete();
-			spillFile = null;
-		}
+		buffer = initialBuffer;
+		serializationReadBuffer.releaseArrays();
+
+		recordLength = -1;
+		lengthBuffer.clear();
+		leftOverData = null;
+		leftOverStart = 0;
+		leftOverLimit = 0;
+		accumulatedRecordBytes = 0;
+
+		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete());
+		spillingChannel = null;
+		spillFileReader = null;
+		spillFile = null;
 	}
 
 	public DataInputView getInputView() {
-		if (spillFileReader == null) {
-			return serializationReadBuffer;
-		}
-		else {
-			return spillFileReader;
-		}
+		return spillFileReader == null ? serializationReadBuffer : spillFileReader;
 	}
 
 	private void ensureBufferCapacity(int minLength) {
 		if (buffer.length < minLength) {
-			byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
+			byte[] newBuffer = new byte[max(minLength, buffer.length * 2)];
 			System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes);
 			buffer = newBuffer;
 		}
@@ -294,4 +274,16 @@ final class SpanningWrapper {
 		random.nextBytes(bytes);
 		return StringUtils.byteToHexString(bytes);
 	}
+
+	private int spill(NonSpanningWrapper partial) throws IOException {
+		ByteBuffer buffer = partial.wrapIntoByteBuffer();
+		int length = buffer.remaining();
+		writeCompletely(spillingChannel, buffer);
+		return length;
+	}
+
+	private boolean isReadingLength() {
+		return lengthBuffer.position() > 0;
+	}
+
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index f20fbc9..75e6b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,19 +24,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.io.IOException;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+
 /**
  * @param <T> The type of the record to be deserialized.
  */
 public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
 
-	private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
-					"Serializer consumed more bytes than the record had. " +
-					"This indicates broken serialization. If you are using custom serialization types " +
-					"(Value or Writable), check their serialization methods. If you are using a " +
-					"Kryo-serialized type, check the corresponding Kryo serializer.";
+	static final int LENGTH_BYTES = Integer.BYTES;
 
 	private final NonSpanningWrapper nonSpanningWrapper;
 
@@ -58,11 +61,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		int numBytes = buffer.getSize();
 
 		// check if some spanning record deserialization is pending
-		if (this.spanningWrapper.getNumGatheredBytes() > 0) {
-			this.spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
-		}
-		else {
-			this.nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset);
+		if (spanningWrapper.getNumGatheredBytes() > 0) {
+			spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
+		} else {
+			nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset);
 		}
 	}
 
@@ -75,14 +77,13 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	@Override
 	public Optional<Buffer> getUnconsumedBuffer() throws IOException {
-		Optional<MemorySegment> target;
-		if (nonSpanningWrapper.remaining() > 0) {
-			target = nonSpanningWrapper.getUnconsumedSegment();
+		final Optional<MemorySegment> unconsumedSegment;
+		if (nonSpanningWrapper.hasRemaining()) {
+			unconsumedSegment = nonSpanningWrapper.getUnconsumedSegment();
 		} else {
-			target = spanningWrapper.getUnconsumedSegment();
+			unconsumedSegment = spanningWrapper.getUnconsumedSegment();
 		}
-		return target.map(memorySegment -> new NetworkBuffer(
-			memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, memorySegment.size()));
+		return unconsumedSegment.map(segment -> new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, segment.size()));
 	}
 
 	@Override
@@ -91,65 +92,31 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		// this should be the majority of the cases for small records
 		// for large records, this portion of the work is very small in comparison anyways
 
-		int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-
-		// check if we can get a full length;
-		if (nonSpanningRemaining >= 4) {
-			int len = this.nonSpanningWrapper.readInt();
-
-			if (len <= nonSpanningRemaining - 4) {
-				// we can get a full record from here
-				try {
-					target.read(this.nonSpanningWrapper);
-
-					int remaining = this.nonSpanningWrapper.remaining();
-					if (remaining > 0) {
-						return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-					}
-					else if (remaining == 0) {
-						return DeserializationResult.LAST_RECORD_FROM_BUFFER;
-					}
-					else {
-						throw new IndexOutOfBoundsException("Remaining = " + remaining);
-					}
-				}
-				catch (IndexOutOfBoundsException e) {
-					throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
-				}
-			}
-			else {
-				// we got the length, but we need the rest from the spanning deserializer
-				// and need to wait for more buffers
-				this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
-				this.nonSpanningWrapper.clear();
-				return DeserializationResult.PARTIAL_RECORD;
-			}
-		} else if (nonSpanningRemaining > 0) {
-			// we have an incomplete length
-			// add our part of the length to the length buffer
-			this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
-			this.nonSpanningWrapper.clear();
-			return DeserializationResult.PARTIAL_RECORD;
-		}
+		if (nonSpanningWrapper.hasCompleteLength()) {
+			return readNonSpanningRecord(target);
 
-		// spanning record case
-		if (this.spanningWrapper.hasFullRecord()) {
-			// get the full record
-			target.read(this.spanningWrapper.getInputView());
+		} else if (nonSpanningWrapper.hasRemaining()) {
+			nonSpanningWrapper.transferTo(spanningWrapper.lengthBuffer);
+			return PARTIAL_RECORD;
 
-			// move the remainder to the non-spanning wrapper
-			// this does not copy it, only sets the memory segment
-			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
-			this.spanningWrapper.clear();
+		} else if (spanningWrapper.hasFullRecord()) {
+			target.read(spanningWrapper.getInputView());
+			spanningWrapper.transferLeftOverTo(nonSpanningWrapper);
+			return nonSpanningWrapper.hasRemaining() ? INTERMEDIATE_RECORD_FROM_BUFFER : LAST_RECORD_FROM_BUFFER;
 
-			return (this.nonSpanningWrapper.remaining() == 0) ?
-				DeserializationResult.LAST_RECORD_FROM_BUFFER :
-				DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 		} else {
-			return DeserializationResult.PARTIAL_RECORD;
+			return PARTIAL_RECORD;
 		}
 	}
 
+	private DeserializationResult readNonSpanningRecord(T target) throws IOException {
+		NextRecordResponse response = nonSpanningWrapper.getNextRecord(target);
+		if (response.result == PARTIAL_RECORD) {
+			spanningWrapper.transferFrom(nonSpanningWrapper, response.bytesLeft);
+		}
+		return response.result;
+	}
+
 	@Override
 	public void clear() {
 		this.nonSpanningWrapper.clear();
@@ -158,7 +125,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	@Override
 	public boolean hasUnfinishedData() {
-		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
+		return this.nonSpanningWrapper.hasRemaining() || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
+	@NotThreadSafe
+	static class NextRecordResponse {
+		DeserializationResult result;
+		int bytesLeft;
+
+		NextRecordResponse(DeserializationResult result, int bytesLeft) {
+			this.result = result;
+			this.bytesLeft = bytesLeft;
+		}
+
+		public NextRecordResponse updated(DeserializationResult result, int bytesLeft) {
+			this.result = result;
+			this.bytesLeft = bytesLeft;
+			return this;
+		}
+	}
 }


[flink] 01/10: [FLINK-17547][task][hotfix] Improve error handling 1 catch one more invalid input in DataOutputSerializer.write 2 more informative error messages

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fedade3f8ec4b3dad0e9d7d0a6751fc2c66a121
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 09:33:25 2020 +0200

    [FLINK-17547][task][hotfix] Improve error handling
    1 catch one more invalid input in DataOutputSerializer.write
    2 more informative error messages
---
 .../main/java/org/apache/flink/core/memory/DataOutputSerializer.java  | 4 ++--
 .../main/java/org/apache/flink/core/memory/HybridMemorySegment.java   | 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 85fd767..1255fc3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -157,8 +157,8 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab
 
 	@Override
 	public void write(MemorySegment segment, int off, int len) throws IOException {
-		if (len < 0 || off > segment.size() - len) {
-			throw new ArrayIndexOutOfBoundsException();
+		if (len < 0 || off < 0 || off > segment.size() - len) {
+			throw new IndexOutOfBoundsException(String.format("offset: %d, length: %d, size: %d", off, len, segment.size()));
 		}
 		if (this.position > this.buffer.length - len) {
 			resize(len);
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
index fb7a4ba..53e8cfd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -195,8 +195,7 @@ public final class HybridMemorySegment extends MemorySegment {
 			throw new IllegalStateException("segment has been freed");
 		}
 		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
+			throw new IndexOutOfBoundsException(String.format("pos: %d, length: %d, index: %d, offset: %d", pos, length, index, offset));
 		}
 	}
 


[flink] 09/10: [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge with next?)

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3dacffe35709f9747923dad4c7028baec27e2651
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 15:59:26 2020 +0200

    [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge
    with next?)
---
 .../io/network/api/serialization/SpanningWrapper.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 18ea6cc..9cffde3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.fs.RefCountedFile;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -74,7 +75,7 @@ final class SpanningWrapper {
 
 	private int leftOverLimit;
 
-	private File spillFile;
+	private RefCountedFile spillFile;
 
 	private DataInputViewStreamWrapper spillFileReader;
 
@@ -136,7 +137,7 @@ final class SpanningWrapper {
 		accumulatedRecordBytes += length;
 		if (hasFullRecord()) {
 			spillingChannel.close();
-			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
+			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE));
 		}
 	}
 
@@ -220,7 +221,6 @@ final class SpanningWrapper {
 		return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position());
 	}
 
-	@SuppressWarnings("ResultOfMethodCallIgnored")
 	public void clear() {
 		buffer = initialBuffer;
 		serializationReadBuffer.releaseArrays();
@@ -232,7 +232,7 @@ final class SpanningWrapper {
 		leftOverLimit = 0;
 		accumulatedRecordBytes = 0;
 
-		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete());
+		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release());
 		spillingChannel = null;
 		spillFileReader = null;
 		spillFile = null;
@@ -260,9 +260,10 @@ final class SpanningWrapper {
 		int maxAttempts = 10;
 		for (int attempt = 0; attempt < maxAttempts; attempt++) {
 			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
-			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
-			if (spillFile.createNewFile()) {
-				return new RandomAccessFile(spillFile, "rw").getChannel();
+			File file = new File(directory, randomString(rnd) + ".inputchannel");
+			if (file.createNewFile()) {
+				spillFile = new RefCountedFile(file);
+				return new RandomAccessFile(file, "rw").getChannel();
 			}
 		}
 


[flink] 07/10: [FLINK-17547][task][hotfix] Extract RefCountedFileWithStream from RefCountedFile Motivation: use RefCountedFile for reading as well.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e323c33a9f25673587ee0e8f4f9786b21db666c
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 15:30:56 2020 +0200

    [FLINK-17547][task][hotfix] Extract RefCountedFileWithStream from RefCountedFile
    Motivation: use RefCountedFile for reading as well.
---
 .../flink/fs/s3/common/FlinkS3FileSystem.java      |  4 +-
 .../utils/RefCountedBufferingFileStream.java       | 10 ++--
 .../flink/fs/s3/common/utils/RefCountedFile.java   | 59 ++-----------------
 ...ntedFile.java => RefCountedFileWithStream.java} | 68 ++++------------------
 .../s3/common/utils/RefCountedTmpFileCreator.java  | 10 ++--
 .../writer/S3RecoverableFsDataOutputStream.java    | 12 ++--
 .../S3RecoverableMultipartUploadFactory.java       |  8 +--
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  8 +--
 .../utils/RefCountedBufferingFileStreamTest.java   |  4 +-
 .../fs/s3/common/utils/RefCountedFileTest.java     | 59 ++-----------------
 ...Test.java => RefCountedFileWithStreamTest.java} | 60 +++----------------
 .../writer/RecoverableMultiPartUploadImplTest.java |  4 +-
 .../S3RecoverableFsDataOutputStreamTest.java       | 10 ++--
 13 files changed, 63 insertions(+), 253 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 5248e06..3514bbcb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common;
 import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
@@ -57,7 +57,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 
 	private final String localTmpDir;
 
-	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
+	private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator;
 
 	@Nullable
 	private final S3AccessHelper s3AccessHelper;
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
index 29f2590..5f149df 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
@@ -29,7 +29,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link RefCountedFile} that also uses an in-memory buffer for buffering small writes.
+ * A {@link RefCountedFileWithStream} that also uses an in-memory buffer for buffering small writes.
  * This is done to avoid frequent 'flushes' of the file stream to disk.
  */
 @Internal
@@ -37,7 +37,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
 
 	public static final int BUFFER_SIZE = 4096;
 
-	private final RefCountedFile currentTmpFile;
+	private final RefCountedFileWithStream currentTmpFile;
 
 	/** The write buffer. */
 	private final byte[] buffer;
@@ -49,7 +49,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
 
 	@VisibleForTesting
 	public RefCountedBufferingFileStream(
-			final RefCountedFile file,
+			final RefCountedFileWithStream file,
 			final int bufferSize) {
 
 		checkArgument(bufferSize > 0L);
@@ -165,7 +165,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
 	// ------------------------- Factory Methods -------------------------
 
 	public static RefCountedBufferingFileStream openNew(
-			final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider) throws IOException {
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider) throws IOException {
 
 		return new RefCountedBufferingFileStream(
 				tmpFileProvider.apply(null),
@@ -173,7 +173,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
 	}
 
 	public static RefCountedBufferingFileStream restore(
-			final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider,
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider,
 			final File initialTmpFile) throws IOException {
 
 		return new RefCountedBufferingFileStream(
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
index 1787636..9675f09 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
@@ -21,11 +21,10 @@ package org.apache.flink.fs.s3.common.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.RefCounted;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,21 +39,13 @@ public class RefCountedFile implements RefCounted {
 
 	private final File file;
 
-	private final OffsetAwareOutputStream stream;
-
 	private final AtomicInteger references;
 
-	private boolean closed;
+	protected boolean closed;
 
-	private RefCountedFile(
-			final File file,
-			final OutputStream currentOut,
-			final long bytesInCurrentPart) {
+	protected RefCountedFile(final File file) {
 		this.file = checkNotNull(file);
 		this.references = new AtomicInteger(1);
-		this.stream = new OffsetAwareOutputStream(
-				currentOut,
-				bytesInCurrentPart);
 		this.closed = false;
 	}
 
@@ -62,33 +53,6 @@ public class RefCountedFile implements RefCounted {
 		return file;
 	}
 
-	public OffsetAwareOutputStream getStream() {
-		return stream;
-	}
-
-	public long getLength() {
-		return stream.getLength();
-	}
-
-	public void write(byte[] b, int off, int len) throws IOException {
-		requireOpened();
-		if (len > 0) {
-			stream.write(b, off, len);
-		}
-	}
-
-	public void flush() throws IOException {
-		requireOpened();
-		stream.flush();
-	}
-
-	public void closeStream() {
-		if (!closed) {
-			IOUtils.closeQuietly(stream);
-			closed = true;
-		}
-	}
-
 	@Override
 	public void retain() {
 		references.incrementAndGet();
@@ -119,22 +83,7 @@ public class RefCountedFile implements RefCounted {
 	}
 
 	@VisibleForTesting
-	int getReferenceCounter() {
+	public int getReferenceCounter() {
 		return references.get();
 	}
-
-	// ------------------------------ Factory methods for initializing a temporary file ------------------------------
-
-	public static RefCountedFile newFile(
-			final File file,
-			final OutputStream currentOut) throws IOException {
-		return new RefCountedFile(file, currentOut, 0L);
-	}
-
-	public static RefCountedFile restoredFile(
-			final File file,
-			final OutputStream currentOut,
-			final long bytesInCurrentPart) {
-		return new RefCountedFile(file, currentOut, bytesInCurrentPart);
-	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
similarity index 60%
copy from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
copy to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
index 1787636..94b8527 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
@@ -19,47 +19,27 @@
 package org.apache.flink.fs.s3.common.utils;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A reference counted file which is deleted as soon as no caller
  * holds a reference to the wrapped {@link File}.
  */
 @Internal
-public class RefCountedFile implements RefCounted {
-
-	private final File file;
+public class RefCountedFileWithStream extends RefCountedFile {
 
 	private final OffsetAwareOutputStream stream;
 
-	private final AtomicInteger references;
-
-	private boolean closed;
-
-	private RefCountedFile(
+	private RefCountedFileWithStream(
 			final File file,
 			final OutputStream currentOut,
 			final long bytesInCurrentPart) {
-		this.file = checkNotNull(file);
-		this.references = new AtomicInteger(1);
-		this.stream = new OffsetAwareOutputStream(
-				currentOut,
-				bytesInCurrentPart);
-		this.closed = false;
-	}
-
-	public File getFile() {
-		return file;
+		super(file);
+		this.stream = new OffsetAwareOutputStream(currentOut, bytesInCurrentPart);
 	}
 
 	public OffsetAwareOutputStream getStream() {
@@ -77,64 +57,36 @@ public class RefCountedFile implements RefCounted {
 		}
 	}
 
-	public void flush() throws IOException {
+	void flush() throws IOException {
 		requireOpened();
 		stream.flush();
 	}
 
-	public void closeStream() {
+	void closeStream() {
 		if (!closed) {
 			IOUtils.closeQuietly(stream);
 			closed = true;
 		}
 	}
 
-	@Override
-	public void retain() {
-		references.incrementAndGet();
-	}
-
-	@Override
-	public boolean release() {
-		if (references.decrementAndGet() == 0) {
-			return tryClose();
-		}
-		return false;
-	}
-
-	private boolean tryClose() {
-		try {
-			Files.deleteIfExists(file.toPath());
-			return true;
-		} catch (Throwable t) {
-			ExceptionUtils.rethrowIfFatalError(t);
-		}
-		return false;
-	}
-
 	private void requireOpened() throws IOException {
 		if (closed) {
 			throw new IOException("Stream closed.");
 		}
 	}
 
-	@VisibleForTesting
-	int getReferenceCounter() {
-		return references.get();
-	}
-
 	// ------------------------------ Factory methods for initializing a temporary file ------------------------------
 
-	public static RefCountedFile newFile(
+	public static RefCountedFileWithStream newFile(
 			final File file,
 			final OutputStream currentOut) throws IOException {
-		return new RefCountedFile(file, currentOut, 0L);
+		return new RefCountedFileWithStream(file, currentOut, 0L);
 	}
 
-	public static RefCountedFile restoredFile(
+	public static RefCountedFileWithStream restoredFile(
 			final File file,
 			final OutputStream currentOut,
 			final long bytesInCurrentPart) {
-		return new RefCountedFile(file, currentOut, bytesInCurrentPart);
+		return new RefCountedFileWithStream(file, currentOut, bytesInCurrentPart);
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
index 7a928d0..51b417c 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
@@ -34,10 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A utility class that creates local {@link RefCountedFile reference counted files} that serve as temporary files.
+ * A utility class that creates local {@link RefCountedFileWithStream reference counted files} that serve as temporary files.
  */
 @Internal
-public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFile, IOException> {
+public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFileWithStream, IOException> {
 
 	private final File[] tempDirectories;
 
@@ -70,7 +70,7 @@ public class RefCountedTmpFileCreator implements FunctionWithException<File, Ref
 	 * @throws IOException Thrown, if the stream to the temp file could not be opened.
 	 */
 	@Override
-	public RefCountedFile apply(File file) throws IOException {
+	public RefCountedFileWithStream apply(File file) throws IOException {
 		final File directory = tempDirectories[nextIndex()];
 
 		while (true) {
@@ -78,10 +78,10 @@ public class RefCountedTmpFileCreator implements FunctionWithException<File, Ref
 				if (file == null) {
 					final File newFile = new File(directory, ".tmp_" + UUID.randomUUID());
 					final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-					return RefCountedFile.newFile(newFile, out);
+					return RefCountedFileWithStream.newFile(newFile, out);
 				} else {
 					final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
-					return RefCountedFile.restoredFile(file, out, file.length());
+					return RefCountedFileWithStream.restoredFile(file, out, file.length());
 				}
 			} catch (FileAlreadyExistsException ignored) {
 				// fall through the loop and retry
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
index 220ddd5..5447026 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.commons.io.IOUtils;
@@ -60,7 +60,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp
 
 	private final RecoverableMultiPartUpload upload;
 
-	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider;
+	private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider;
 
 	/**
 	 * The number of bytes at which we start a new part of the multipart upload.
@@ -80,7 +80,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp
 	 */
 	S3RecoverableFsDataOutputStream(
 			RecoverableMultiPartUpload upload,
-			FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
+			FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator,
 			RefCountedFSOutputStream initialTmpFile,
 			long userDefinedMinPartSize,
 			long bytesBeforeCurrentPart) {
@@ -228,7 +228,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp
 
 	public static S3RecoverableFsDataOutputStream newStream(
 			final RecoverableMultiPartUpload upload,
-			final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator,
 			final long userDefinedMinPartSize) throws IOException {
 
 		checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE);
@@ -245,7 +245,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp
 
 	public static S3RecoverableFsDataOutputStream recoverStream(
 			final RecoverableMultiPartUpload upload,
-			final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator,
 			final long userDefinedMinPartSize,
 			final long bytesBeforeCurrentPart) throws IOException {
 
@@ -264,7 +264,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp
 	}
 
 	private static RefCountedBufferingFileStream boundedBufferingFileStream(
-			final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator,
 			final Optional<File> incompletePart) throws IOException {
 
 		if (!incompletePart.isPresent()) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index 3727e25..b7fb8fb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common.writer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {
 
 	private final S3AccessHelper s3AccessHelper;
 
-	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
+	private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileSupplier;
 
 	private final int maxConcurrentUploadsPerStream;
 
@@ -54,7 +54,7 @@ final class S3RecoverableMultipartUploadFactory {
 			final S3AccessHelper s3AccessHelper,
 			final int maxConcurrentUploadsPerStream,
 			final Executor executor,
-			final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileSupplier) {
 
 		this.fs = Preconditions.checkNotNull(fs);
 		this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
@@ -92,7 +92,7 @@ final class S3RecoverableMultipartUploadFactory {
 		}
 
 		// download the file (simple way)
-		final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
+		final RefCountedFileWithStream refCountedFile = tmpFileSupplier.apply(null);
 		final File file = refCountedFile.getFile();
 		final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index ddb4443..a6b62cc 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class S3RecoverableWriter implements RecoverableWriter {
 
-	private final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator;
+	private final FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator;
 
 	private final long userDefinedMinPartSize;
 
@@ -62,7 +62,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 	S3RecoverableWriter(
 			final S3AccessHelper s3AccessHelper,
 			final S3RecoverableMultipartUploadFactory uploadFactory,
-			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator,
 			final long userDefinedMinPartSize) {
 
 		this.s3AccessHelper = checkNotNull(s3AccessHelper);
@@ -144,7 +144,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 
 	public static S3RecoverableWriter writer(
 			final FileSystem fs,
-			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
+			final FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator,
 			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final long userDefinedMinPartSize,
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
index 50ea9bd..368c9cf 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
@@ -134,11 +134,11 @@ public class RefCountedBufferingFileStreamTest {
 		return new RefCountedBufferingFileStream(getRefCountedFileWithContent(), BUFFER_SIZE);
 	}
 
-	private RefCountedFile getRefCountedFileWithContent() throws IOException {
+	private RefCountedFileWithStream getRefCountedFileWithContent() throws IOException {
 		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
 		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
 
-		return RefCountedFile.newFile(newFile, out);
+		return RefCountedFileWithStream.newFile(newFile, out);
 	}
 
 	private static byte[] bytesOf(String str) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
index 2e03197..217f4e1 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -25,14 +25,14 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 import java.util.UUID;
 import java.util.stream.Stream;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Tests for the {@link RefCountedFile}.
  */
@@ -44,9 +44,9 @@ public class RefCountedFileTest {
 	@Test
 	public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException {
 		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
-		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+		checkState(newFile.createNewFile());
 
-		RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
+		RefCountedFile fileUnderTest = new RefCountedFile(newFile);
 		verifyTheFileIsStillThere();
 
 		fileUnderTest.release();
@@ -59,10 +59,10 @@ public class RefCountedFileTest {
 	@Test
 	public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOException {
 		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
-		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+		checkState(newFile.createNewFile());
 
 		// the reference counter always starts with 1 (not 0). This is why we need +1 releases
-		RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
+		RefCountedFile fileUnderTest = new RefCountedFile(newFile);
 		verifyTheFileIsStillThere();
 
 		fileUnderTest.retain();
@@ -85,59 +85,12 @@ public class RefCountedFileTest {
 		}
 	}
 
-	@Test
-	public void writeShouldSucceed() throws IOException {
-		byte[] content = bytesOf("hello world");
-
-		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent(content);
-		long fileLength = fileUnderTest.getLength();
-
-		Assert.assertEquals(content.length, fileLength);
-	}
-
-	@Test
-	public void closeShouldNotReleaseReference() throws IOException {
-		getClosedRefCountedFileWithContent("hello world");
-		verifyTheFileIsStillThere();
-	}
-
-	@Test(expected = IOException.class)
-	public void writeAfterCloseShouldThrowException() throws IOException {
-		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world");
-		byte[] content = bytesOf("Hello Again");
-		fileUnderTest.write(content, 0, content.length);
-	}
-
-	@Test(expected = IOException.class)
-	public void flushAfterCloseShouldThrowException() throws IOException {
-		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world");
-		fileUnderTest.flush();
-	}
-
-	// ------------------------------------- Utilities -------------------------------------
-
 	private void verifyTheFileIsStillThere() throws IOException {
 		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
 			Assert.assertEquals(1L, files.count());
 		}
 	}
 
-	private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException {
-		return getClosedRefCountedFileWithContent(bytesOf(content));
-	}
-
-	private RefCountedFile getClosedRefCountedFileWithContent(byte[] content) throws IOException {
-		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
-		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-
-		final RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
-
-		fileUnderTest.write(content, 0, content.length);
-
-		fileUnderTest.closeStream();
-		return fileUnderTest;
-	}
-
 	private static byte[] bytesOf(String str) {
 		return str.getBytes(StandardCharsets.UTF_8);
 	}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
similarity index 56%
copy from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
copy to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
index 2e03197..7aa7240 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
@@ -34,62 +34,18 @@ import java.util.UUID;
 import java.util.stream.Stream;
 
 /**
- * Tests for the {@link RefCountedFile}.
+ * Tests for the {@link RefCountedFileWithStream}.
  */
-public class RefCountedFileTest {
+public class RefCountedFileWithStreamTest {
 
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException {
-		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
-		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-
-		RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
-		verifyTheFileIsStillThere();
-
-		fileUnderTest.release();
-
-		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
-			Assert.assertEquals(0L, files.count());
-		}
-	}
-
-	@Test
-	public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOException {
-		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
-		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-
-		// the reference counter always starts with 1 (not 0). This is why we need +1 releases
-		RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
-		verifyTheFileIsStillThere();
-
-		fileUnderTest.retain();
-		fileUnderTest.retain();
-
-		Assert.assertEquals(3, fileUnderTest.getReferenceCounter());
-
-		fileUnderTest.release();
-		Assert.assertEquals(2, fileUnderTest.getReferenceCounter());
-		verifyTheFileIsStillThere();
-
-		fileUnderTest.release();
-		Assert.assertEquals(1, fileUnderTest.getReferenceCounter());
-		verifyTheFileIsStillThere();
-
-		fileUnderTest.release();
-		// the file is deleted now
-		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
-			Assert.assertEquals(0L, files.count());
-		}
-	}
-
-	@Test
 	public void writeShouldSucceed() throws IOException {
 		byte[] content = bytesOf("hello world");
 
-		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent(content);
+		final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent(content);
 		long fileLength = fileUnderTest.getLength();
 
 		Assert.assertEquals(content.length, fileLength);
@@ -103,14 +59,14 @@ public class RefCountedFileTest {
 
 	@Test(expected = IOException.class)
 	public void writeAfterCloseShouldThrowException() throws IOException {
-		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world");
+		final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent("hello world");
 		byte[] content = bytesOf("Hello Again");
 		fileUnderTest.write(content, 0, content.length);
 	}
 
 	@Test(expected = IOException.class)
 	public void flushAfterCloseShouldThrowException() throws IOException {
-		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world");
+		final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent("hello world");
 		fileUnderTest.flush();
 	}
 
@@ -122,15 +78,15 @@ public class RefCountedFileTest {
 		}
 	}
 
-	private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException {
+	private RefCountedFileWithStream getClosedRefCountedFileWithContent(String content) throws IOException {
 		return getClosedRefCountedFileWithContent(bytesOf(content));
 	}
 
-	private RefCountedFile getClosedRefCountedFileWithContent(byte[] content) throws IOException {
+	private RefCountedFileWithStream getClosedRefCountedFileWithContent(byte[] content) throws IOException {
 		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
 		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
 
-		final RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
+		final RefCountedFileWithStream fileUnderTest = RefCountedFileWithStream.newFile(newFile, out);
 
 		fileUnderTest.write(content, 0, content.length);
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index f01da88..e8c6e9e 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3.common.writer;
 
 import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.MathUtils;
 
@@ -320,7 +320,7 @@ public class RecoverableMultiPartUploadImplTest {
 		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
 
 		final RefCountedBufferingFileStream testStream =
-				new RefCountedBufferingFileStream(RefCountedFile.newFile(newFile, out), BUFFER_SIZE);
+				new RefCountedBufferingFileStream(RefCountedFileWithStream.newFile(newFile, out), BUFFER_SIZE);
 
 		testStream.write(content, 0, content.length);
 		return testStream;
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
index 14ed2e2..b7c94c4 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -483,7 +483,7 @@ public class S3RecoverableFsDataOutputStreamTest {
 		}
 	}
 
-	private static class TestFileProvider implements FunctionWithException<File, RefCountedFile, IOException> {
+	private static class TestFileProvider implements FunctionWithException<File, RefCountedFileWithStream, IOException> {
 
 		private final TemporaryFolder folder;
 
@@ -492,16 +492,16 @@ public class S3RecoverableFsDataOutputStreamTest {
 		}
 
 		@Override
-		public RefCountedFile apply(@Nullable File file) throws IOException {
+		public RefCountedFileWithStream apply(@Nullable File file) throws IOException {
 			while (true) {
 				try {
 					if (file == null) {
 						final File newFile = new File(folder.getRoot(), ".tmp_" + UUID.randomUUID());
 						final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-						return RefCountedFile.newFile(newFile, out);
+						return RefCountedFileWithStream.newFile(newFile, out);
 					} else {
 						final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
-						return RefCountedFile.restoredFile(file, out, file.length());
+						return RefCountedFileWithStream.restoredFile(file, out, file.length());
 					}
 				} catch (FileAlreadyExistsException e) {
 					// fall through the loop and retry


[flink] 06/10: [FLINK-17547][task] Use iterator for unconsumed buffers. Motivation: support spilled records Changes: 1. change SpillingAdaptiveSpanningRecordDeserializer.getUnconsumedBuffer signature 2. adapt channel state persistence to new types

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c9bf0368e9a233a2a013436628790c2c2b60bcb
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon May 18 20:29:05 2020 +0200

    [FLINK-17547][task] Use iterator for unconsumed buffers.
    Motivation: support spilled records
    Changes:
    1. change SpillingAdaptiveSpanningRecordDeserializer.getUnconsumedBuffer
    signature
    2. adapt channel state persistence to new types
    
    No changes in existing logic.
---
 .../org/apache/flink/util/CloseableIterator.java   | 77 +++++++++++++++++++++-
 .../main/java/org/apache/flink/util/IOUtils.java   |  7 ++
 .../channel/ChannelStateWriteRequest.java          | 33 +++++++---
 .../ChannelStateWriteRequestDispatcherImpl.java    |  6 +-
 .../ChannelStateWriteRequestExecutorImpl.java      | 20 ++++--
 .../checkpoint/channel/ChannelStateWriter.java     |  6 +-
 .../checkpoint/channel/ChannelStateWriterImpl.java | 17 +++--
 .../api/serialization/NonSpanningWrapper.java      | 22 +++++--
 .../api/serialization/RecordDeserializer.java      |  4 +-
 .../network/api/serialization/SpanningWrapper.java | 12 ++--
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 15 +----
 .../partition/consumer/RemoteInputChannel.java     |  3 +-
 .../ChannelStateWriteRequestDispatcherTest.java    | 10 ++-
 .../ChannelStateWriteRequestExecutorImplTest.java  |  1 -
 .../channel/ChannelStateWriterImplTest.java        | 13 ++--
 .../channel/CheckpointInProgressRequestTest.java   |  7 +-
 .../checkpoint/channel/MockChannelStateWriter.java | 11 +++-
 .../channel/RecordingChannelStateWriter.java       | 12 +++-
 .../SpanningRecordSerializationTest.java           |  9 +--
 .../partition/consumer/SingleInputGateTest.java    | 14 +++-
 .../runtime/state/ChannelPersistenceITCase.java    |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java     |  3 +-
 .../runtime/io/StreamTaskNetworkInput.java         | 11 ++--
 23 files changed, 235 insertions(+), 81 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
index 09ea046..cc51324 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -20,10 +20,15 @@ package org.apache.flink.util;
 
 import javax.annotation.Nonnull;
 
+import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.function.Consumer;
 
+import static java.util.Arrays.asList;
+
 /**
  * This interface represents an {@link Iterator} that is also {@link AutoCloseable}. A typical use-case for this
  * interface are iterators that are based on native-resources such as files, network, or database connections. Clients
@@ -37,7 +42,42 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
 
 	@Nonnull
 	static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> iterator) {
-		return new IteratorAdapter<>(iterator);
+		return adapterForIterator(iterator, () -> {});
+	}
+
+	static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> iterator, AutoCloseable close) {
+		return new IteratorAdapter<>(iterator, close);
+	}
+
+	static <T> CloseableIterator<T> fromList(List<T> list, Consumer<T> closeNotConsumed) {
+		return new CloseableIterator<T>(){
+			private final Deque<T> stack = new ArrayDeque<>(list);
+
+			@Override
+			public boolean hasNext() {
+				return !stack.isEmpty();
+			}
+
+			@Override
+			public T next() {
+				return stack.poll();
+			}
+
+			@Override
+			public void close() throws Exception {
+				Exception exception = null;
+				for (T el : stack) {
+					try {
+						closeNotConsumed.accept(el);
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+				}
+				if (exception != null) {
+					throw exception;
+				}
+			}
+		};
 	}
 
 	@SuppressWarnings("unchecked")
@@ -45,6 +85,34 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
 		return (CloseableIterator<T>) EMPTY_INSTANCE;
 	}
 
+	static <T> CloseableIterator<T> ofElements(Consumer<T> closeNotConsumed, T... elements) {
+		return fromList(asList(elements), closeNotConsumed);
+	}
+
+	static <E> CloseableIterator<E> ofElement(E element, Consumer<E> closeIfNotConsumed) {
+		return new CloseableIterator<E>(){
+			private boolean hasNext = true;
+
+			@Override
+			public boolean hasNext() {
+				return hasNext;
+			}
+
+			@Override
+			public E next() {
+				hasNext = false;
+				return element;
+			}
+
+			@Override
+			public void close() {
+				if (hasNext) {
+					closeIfNotConsumed.accept(element);
+				}
+			}
+		};
+	}
+
 	/**
 	 * Adapter from {@link Iterator} to {@link CloseableIterator}. Does nothing on {@link #close()}.
 	 *
@@ -54,9 +122,11 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
 
 		@Nonnull
 		private final Iterator<E> delegate;
+		private final AutoCloseable close;
 
-		IteratorAdapter(@Nonnull Iterator<E> delegate) {
+		IteratorAdapter(@Nonnull Iterator<E> delegate, AutoCloseable close) {
 			this.delegate = delegate;
+			this.close = close;
 		}
 
 		@Override
@@ -80,7 +150,8 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
 		}
 
 		@Override
-		public void close() {
+		public void close() throws Exception {
+			close.close();
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 1f9af18..0b8f210 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -216,6 +216,13 @@ public final class IOUtils {
 	}
 
 	/**
+	 * @see #closeAll(Iterable)
+	 */
+	public static void closeAll(AutoCloseable... closeables) throws Exception {
+		closeAll(asList(closeables));
+	}
+
+	/**
 	 * Closes all {@link AutoCloseable} objects in the parameter, suppressing exceptions. Exception will be emitted
 	 * after calling close() on every object.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index bd6c7ba..0848698 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -20,23 +20,24 @@ package org.apache.flink.runtime.checkpoint.channel;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.EXECUTING;
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.FAILED;
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.NEW;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 interface ChannelStateWriteRequest {
 	long getCheckpointId();
 
-	void cancel(Throwable cause);
+	void cancel(Throwable cause) throws Exception;
 
 	static CheckpointInProgressRequest completeInput(long checkpointId) {
 		return new CheckpointInProgressRequest("completeInput", checkpointId, ChannelStateCheckpointWriter::completeInput, false);
@@ -46,8 +47,24 @@ interface ChannelStateWriteRequest {
 		return new CheckpointInProgressRequest("completeOutput", checkpointId, ChannelStateCheckpointWriter::completeOutput, false);
 	}
 
-	static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, Buffer... flinkBuffers) {
-		return new CheckpointInProgressRequest("writeInput", checkpointId, writer -> writer.writeInput(info, flinkBuffers), recycle(flinkBuffers), false);
+	static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, CloseableIterator<Buffer> iterator) {
+		return new CheckpointInProgressRequest(
+			"writeInput",
+			checkpointId,
+			writer -> {
+				while (iterator.hasNext()) {
+					Buffer buffer = iterator.next();
+					try {
+						checkArgument(buffer.isBuffer());
+					} catch (Exception e) {
+						buffer.recycleBuffer();
+						throw e;
+					}
+					writer.writeInput(info, buffer);
+				}
+			},
+			throwable -> iterator.close(),
+			false);
 	}
 
 	static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, Buffer... flinkBuffers) {
@@ -62,7 +79,7 @@ interface ChannelStateWriteRequest {
 		return new CheckpointInProgressRequest("abort", checkpointId, writer -> writer.fail(cause), true);
 	}
 
-	static Consumer<Throwable> recycle(Buffer[] flinkBuffers) {
+	static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] flinkBuffers) {
 		return unused -> {
 			for (Buffer b : flinkBuffers) {
 				b.recycleBuffer();
@@ -112,7 +129,7 @@ enum CheckpointInProgressRequestState {
 
 final class CheckpointInProgressRequest implements ChannelStateWriteRequest {
 	private final ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action;
-	private final Consumer<Throwable> discardAction;
+	private final ThrowingConsumer<Throwable, Exception> discardAction;
 	private final long checkpointId;
 	private final String name;
 	private final boolean ignoreMissingWriter;
@@ -123,7 +140,7 @@ final class CheckpointInProgressRequest implements ChannelStateWriteRequest {
 		}, ignoreMissingWriter);
 	}
 
-	CheckpointInProgressRequest(String name, long checkpointId, ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, Consumer<Throwable> discardAction, boolean ignoreMissingWriter) {
+	CheckpointInProgressRequest(String name, long checkpointId, ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, ThrowingConsumer<Throwable, Exception> discardAction, boolean ignoreMissingWriter) {
 		this.checkpointId = checkpointId;
 		this.action = checkNotNull(action);
 		this.discardAction = checkNotNull(discardAction);
@@ -137,7 +154,7 @@ final class CheckpointInProgressRequest implements ChannelStateWriteRequest {
 	}
 
 	@Override
-	public void cancel(Throwable cause) {
+	public void cancel(Throwable cause) throws Exception {
 		if (state.compareAndSet(NEW, CANCELLED) || state.compareAndSet(FAILED, CANCELLED)) {
 			discardAction.accept(cause);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
index 843663e..0a15b91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
@@ -51,7 +51,11 @@ final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteR
 		try {
 			dispatchInternal(request);
 		} catch (Exception e) {
-			request.cancel(e);
+			try {
+				request.cancel(e);
+			} catch (Exception ex) {
+				e.addSuppressed(ex);
+			}
 			throw e;
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index cbcc3f7..e87a21c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -32,6 +32,9 @@ import java.util.List;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.IOUtils.closeAll;
 
 /**
  * Executes {@link ChannelStateWriteRequest}s in a separate thread. Any exception occurred during execution causes this
@@ -67,8 +70,15 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
 		} catch (Exception ex) {
 			thrown = ex;
 		} finally {
-			cleanupRequests();
-			dispatcher.fail(thrown == null ? new CancellationException() : thrown);
+			try {
+				closeAll(
+					this::cleanupRequests,
+					() -> dispatcher.fail(thrown == null ? new CancellationException() : thrown)
+				);
+			} catch (Exception e) {
+				//noinspection NonAtomicOperationOnVolatileField
+				thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+			}
 		}
 		LOG.debug("loop terminated");
 	}
@@ -87,14 +97,12 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
 		}
 	}
 
-	private void cleanupRequests() {
+	private void cleanupRequests() throws Exception {
 		Throwable cause = thrown == null ? new CancellationException() : thrown;
 		List<ChannelStateWriteRequest> drained = new ArrayList<>();
 		deque.drainTo(drained);
 		LOG.info("discarding {} drained requests", drained.size());
-		for (ChannelStateWriteRequest request : drained) {
-			request.cancel(cause);
-		}
+		closeAll(drained.stream().<AutoCloseable>map(request -> () -> request.cancel(cause)).collect(Collectors.toList()));
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index e19b1e2..5dad559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.Closeable;
 import java.util.Collection;
@@ -99,11 +100,10 @@ public interface ChannelStateWriter extends Closeable {
 	 *                    It is intended to use for incremental snapshots.
 	 *                    If no data is passed it is ignored.
 	 * @param data zero or more <b>data</b> buffers ordered by their sequence numbers
-	 * @throws IllegalArgumentException if one or more passed buffers {@link Buffer#isBuffer()  isn't a buffer}
 	 * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
 	 * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
 	 */
-	void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) throws IllegalArgumentException;
+	void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data);
 
 	/**
 	 * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}.
@@ -161,7 +161,7 @@ public interface ChannelStateWriter extends Closeable {
 		}
 
 		@Override
-		public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
+		public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 412a9f5..b6fa588 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -103,10 +104,9 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 	}
 
 	@Override
-	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
-		LOG.debug("add input data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
-			checkpointId, info, startSeqNum, data == null ? 0 : data.length);
-		enqueue(write(checkpointId, info, checkBufferType(data)), false);
+	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
+		LOG.debug("add input data, checkpoint id: {}, channel: {}, startSeqNum: {}", checkpointId, info, startSeqNum);
+		enqueue(write(checkpointId, info, iterator), false);
 	}
 
 	@Override
@@ -168,8 +168,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 				executor.submit(request);
 			}
 		} catch (Exception e) {
-			request.cancel(e);
-			throw new RuntimeException("unable to send request to worker", e);
+			RuntimeException wrapped = new RuntimeException("unable to send request to worker", e);
+			try {
+				request.cancel(e);
+			} catch (Exception cancelException) {
+				wrapped.addSuppressed(cancelException);
+			}
+			throw wrapped;
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 5de5467..343c6f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -22,17 +22,21 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
-import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
 import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
 
 final class NonSpanningWrapper implements DataInputView {
 
@@ -69,13 +73,13 @@ final class NonSpanningWrapper implements DataInputView {
 		this.limit = limit;
 	}
 
-	Optional<MemorySegment> getUnconsumedSegment() {
+	CloseableIterator<Buffer> getUnconsumedSegment() {
 		if (!hasRemaining()) {
-			return Optional.empty();
+			return CloseableIterator.empty();
 		}
-		MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
-		segment.copyTo(position, target, 0, remaining());
-		return Optional.of(target);
+		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(remaining());
+		this.segment.copyTo(position, segment, 0, remaining());
+		return singleBufferIterator(segment);
 	}
 
 	boolean hasRemaining() {
@@ -359,4 +363,10 @@ final class NonSpanningWrapper implements DataInputView {
 		return recordLength <= remaining();
 	}
 
+	static CloseableIterator<Buffer> singleBufferIterator(MemorySegment target) {
+		return CloseableIterator.ofElement(
+			new NetworkBuffer(target, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, target.size()),
+			Buffer::recycleBuffer);
+	}
+
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index 4f4d621..07ff5ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.IOException;
-import java.util.Optional;
 
 /**
  * Interface for turning sequences of memory segments into records.
@@ -71,5 +71,5 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
 	 * <p>Note that the unconsumed buffer might be null if the whole buffer was already consumed
 	 * before and there are no partial length or data remained in the end of buffer.
 	 */
-	Optional<Buffer> getUnconsumedBuffer() throws IOException;
+	CloseableIterator<Buffer> getUnconsumedBuffer() throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 430f0db..18ea6cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -34,11 +36,11 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
-import java.util.Optional;
 import java.util.Random;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static org.apache.flink.util.FileUtils.writeCompletely;
 import static org.apache.flink.util.IOUtils.closeAllQuietly;
@@ -165,15 +167,15 @@ final class SpanningWrapper {
 		}
 	}
 
-	Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+	CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
 		if (isReadingLength()) {
-			return Optional.of(copyLengthBuffer());
+			return singleBufferIterator(copyLengthBuffer());
 		} else if (isAboveSpillingThreshold()) {
 			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records.");
 		} else if (recordLength == -1) {
-			return Optional.empty(); // no remaining partial length or data
+			return CloseableIterator.empty(); // no remaining partial length or data
 		} else {
-			return Optional.of(copyDataBuffer());
+			return singleBufferIterator(copyDataBuffer());
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 75e6b0b..2d4c24c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -21,18 +21,15 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
-import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
 import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
-import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
 
 /**
  * @param <T> The type of the record to be deserialized.
@@ -76,14 +73,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 	}
 
 	@Override
-	public Optional<Buffer> getUnconsumedBuffer() throws IOException {
-		final Optional<MemorySegment> unconsumedSegment;
-		if (nonSpanningWrapper.hasRemaining()) {
-			unconsumedSegment = nonSpanningWrapper.getUnconsumedSegment();
-		} else {
-			unconsumedSegment = spanningWrapper.getUnconsumedSegment();
-		}
-		return unconsumedSegment.map(segment -> new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, segment.size()));
+	public CloseableIterator<Buffer> getUnconsumedBuffer() throws IOException {
+		return nonSpanningWrapper.hasRemaining() ? nonSpanningWrapper.getUnconsumedSegment() : spanningWrapper.getUnconsumedSegment();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 6db81e9..4e1f260 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -207,7 +208,7 @@ public class RemoteInputChannel extends InputChannel {
 				checkpointId,
 				channelInfo,
 				ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-				inflightBuffers.toArray(new Buffer[0]));
+				CloseableIterator.fromList(inflightBuffers, Buffer::recycleBuffer));
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
index f953c22..00c8ca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
@@ -17,8 +17,13 @@
 
 package org.apache.flink.runtime.checkpoint.channel;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.CloseableIterator;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -83,7 +88,10 @@ public class ChannelStateWriteRequestDispatcherTest {
 	}
 
 	private static ChannelStateWriteRequest writeIn() {
-		return write(CHECKPOINT_ID, new InputChannelInfo(1, 1));
+		return write(CHECKPOINT_ID, new InputChannelInfo(1, 1), CloseableIterator.ofElement(
+			new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1), FreeingBufferRecycler.INSTANCE),
+			Buffer::recycleBuffer
+		));
 	}
 
 	private static ChannelStateWriteRequest writeOut() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
index 5aad9c6..a299b34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.LinkedBlockingDeque;
 
 import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher.NO_OP;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 44552e6..92a7e88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
+import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -47,14 +48,16 @@ public class ChannelStateWriterImplTest {
 	private static final long CHECKPOINT_ID = 42L;
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testAddEventBuffer() {
+	public void testAddEventBuffer() throws Exception {
+
 		NetworkBuffer dataBuf = getBuffer();
 		NetworkBuffer eventBuf = getBuffer();
 		eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-		ChannelStateWriterImpl writer = openWriter();
-		callStart(writer);
 		try {
-			writer.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 1, eventBuf, dataBuf);
+			runWithSyncWorker(writer -> {
+				callStart(writer);
+				writer.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 1, ofElements(Buffer::recycleBuffer, eventBuf, dataBuf));
+			});
 		} finally {
 			assertTrue(dataBuf.isRecycled());
 		}
@@ -285,7 +288,7 @@ public class ChannelStateWriterImplTest {
 	}
 
 	private void callAddInputData(ChannelStateWriter writer, NetworkBuffer... buffer) {
-		writer.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 1, buffer);
+		writer.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 1, ofElements(Buffer::recycleBuffer, buffer));
 	}
 
 	private void callAbort(ChannelStateWriter writer) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
index 3617b8f..556bf49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * {@link CheckpointInProgressRequest} test.
@@ -41,7 +42,11 @@ public class CheckpointInProgressRequestTest {
 		Thread[] threads = new Thread[barrier.getParties()];
 		for (int i = 0; i < barrier.getParties(); i++) {
 			threads[i] = new Thread(() -> {
-				request.cancel(new RuntimeException("test"));
+				try {
+					request.cancel(new RuntimeException("test"));
+				} catch (Exception e) {
+					fail(e.getMessage());
+				}
 				await(barrier);
 			});
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 5dcc00c..0a61066d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -19,6 +19,9 @@ package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
+
+import static org.apache.flink.util.ExceptionUtils.rethrow;
 
 /**
  * A no op implementation that performs basic checks of the contract, but does not actually write any data.
@@ -49,10 +52,12 @@ public class MockChannelStateWriter implements ChannelStateWriter {
 	}
 
 	@Override
-	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
+	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
 		checkCheckpointId(checkpointId);
-		for (final Buffer buffer : data) {
-			buffer.recycleBuffer();
+		try {
+			iterator.close();
+		} catch (Exception e) {
+			rethrow(e);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
index b53e37b..d0cfe3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
@@ -19,12 +19,15 @@ package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.LinkedListMultimap;
 import org.apache.flink.shaded.guava18.com.google.common.collect.ListMultimap;
 
 import java.util.Arrays;
 
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
 /**
  * A simple {@link ChannelStateWriter} used to write unit tests.
  */
@@ -54,9 +57,14 @@ public class RecordingChannelStateWriter extends MockChannelStateWriter {
 	}
 
 	@Override
-	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
+	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
 		checkCheckpointId(checkpointId);
-		addedInput.putAll(info, Arrays.asList(data));
+		iterator.forEachRemaining(b -> addedInput.put(info, b));
+		try {
+			iterator.close();
+		} catch (Exception e) {
+			rethrow(e);
+		}
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 183df10..e35b311 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -46,7 +47,6 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.Random;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -293,14 +293,15 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		}
 	}
 
-	private static void assertUnconsumedBuffer(ByteArrayOutputStream expected, Optional<Buffer> actual) {
-		if (!actual.isPresent()) {
+	private static void assertUnconsumedBuffer(ByteArrayOutputStream expected, CloseableIterator<Buffer> actual) throws Exception {
+		if (!actual.hasNext()) {
 			Assert.assertEquals(expected.size(), 0);
 		}
 
 		ByteBuffer expectedByteBuffer = ByteBuffer.wrap(expected.toByteArray());
-		ByteBuffer actualByteBuffer = actual.get().getNioBufferReadable();
+		ByteBuffer actualByteBuffer = actual.next().getNioBufferReadable();
 		Assert.assertEquals(expectedByteBuffer, actualByteBuffer);
+		actual.close();
 	}
 
 	private static void writeBuffer(ByteBuffer buffer, OutputStream stream) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index abcf563..6f49b44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -66,6 +66,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
@@ -74,7 +75,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -94,6 +94,7 @@ import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTes
 import static org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.submitTasksAndWaitForResults;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -989,8 +990,15 @@ public class SingleInputGateTest extends InputGateTestBase {
 
 		inputChannel.spillInflightBuffers(0, new ChannelStateWriterImpl.NoOpChannelStateWriter() {
 			@Override
-			public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
-				inflightBuffers.addAll(Arrays.asList(data));
+			public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
+				List<Buffer> list = new ArrayList<>();
+				iterator.forEachRemaining(list::add);
+				inflightBuffers.addAll(list);
+				try {
+					iterator.close();
+				} catch (Exception e) {
+					rethrow(e);
+				}
 			}
 		});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 3f5e2cc..a77dbbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -50,6 +50,7 @@ import static java.util.Collections.singletonMap;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult.NO_MORE_DATA;
 import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN;
+import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -102,7 +103,7 @@ public class ChannelPersistenceITCase {
 			writer.open();
 			writer.start(checkpointId, new CheckpointOptions(CHECKPOINT, new CheckpointStorageLocationReference("poly".getBytes())));
 			for (Map.Entry<InputChannelInfo, Buffer> e : icBuffers.entrySet()) {
-				writer.addInputData(checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, e.getValue());
+				writer.addInputData(checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, ofElements(Buffer::recycleBuffer, e.getValue()));
 			}
 			writer.finishInput(checkpointId);
 			for (Map.Entry<ResultSubpartitionInfo, Buffer> e : rsBuffers.entrySet()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index f98e83f..d39accf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.util.CloseableIterator.ofElement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -330,7 +331,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 					currentReceivedCheckpointId,
 					channelInfo,
 					ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-					buffer);
+					ofElement(buffer, Buffer::recycleBuffer));
 			} else {
 				buffer.recycleBuffer();
 			}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 07826c7..6723a2d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -212,12 +212,11 @@ public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> {
 			// Assumption for retrieving buffers = one concurrent checkpoint
 			RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
 			if (deserializer != null) {
-				deserializer.getUnconsumedBuffer().ifPresent(buffer ->
-					channelStateWriter.addInputData(
-						checkpointId,
-						channel.getChannelInfo(),
-						ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-						buffer));
+				channelStateWriter.addInputData(
+					checkpointId,
+					channel.getChannelInfo(),
+					ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+					deserializer.getUnconsumedBuffer());
 			}
 
 			checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, channelStateWriter);


[flink] 03/10: [FLINK-17547][task][hotfix] Extract SpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static inner class). As it is, no logical changes.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5fd01eacd2e49673b0ff1532d0798844639569de
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed May 6 17:55:48 2020 +0200

    [FLINK-17547][task][hotfix] Extract SpanningWrapper
    from SpillingAdaptiveSpanningRecordDeserializer (static inner class).
    As it is, no logical changes.
---
 .../network/api/serialization/SpanningWrapper.java | 297 +++++++++++++++++++++
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 278 -------------------
 2 files changed, 297 insertions(+), 278 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
new file mode 100644
index 0000000..e59363f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Random;
+
+final class SpanningWrapper {
+
+	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
+
+	private final byte[] initialBuffer = new byte[1024];
+
+	private final String[] tempDirs;
+
+	private final Random rnd = new Random();
+
+	private final DataInputDeserializer serializationReadBuffer;
+
+	private final ByteBuffer lengthBuffer;
+
+	private FileChannel spillingChannel;
+
+	private byte[] buffer;
+
+	private int recordLength;
+
+	private int accumulatedRecordBytes;
+
+	private MemorySegment leftOverData;
+
+	private int leftOverStart;
+
+	private int leftOverLimit;
+
+	private File spillFile;
+
+	private DataInputViewStreamWrapper spillFileReader;
+
+	public SpanningWrapper(String[] tempDirs) {
+		this.tempDirs = tempDirs;
+
+		this.lengthBuffer = ByteBuffer.allocate(4);
+		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+
+		this.recordLength = -1;
+
+		this.serializationReadBuffer = new DataInputDeserializer();
+		this.buffer = initialBuffer;
+	}
+
+	void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
+		// set the length and copy what is available to the buffer
+		this.recordLength = nextRecordLength;
+
+		final int numBytesChunk = partial.remaining();
+
+		if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
+			// create a spilling channel and put the data there
+			this.spillingChannel = createSpillingChannel();
+
+			ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
+			FileUtils.writeCompletely(this.spillingChannel, toWrite);
+		}
+		else {
+			// collect in memory
+			ensureBufferCapacity(nextRecordLength);
+			partial.segment.get(partial.position, buffer, 0, numBytesChunk);
+		}
+
+		this.accumulatedRecordBytes = numBytesChunk;
+	}
+
+	void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
+		// copy what we have to the length buffer
+		partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
+	}
+
+	void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
+		int segmentPosition = offset;
+		int segmentRemaining = numBytes;
+		// check where to go. if we have a partial length, we need to complete it first
+		if (this.lengthBuffer.position() > 0) {
+			int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+			segment.get(segmentPosition, this.lengthBuffer, toPut);
+			// did we complete the length?
+			if (this.lengthBuffer.hasRemaining()) {
+				return;
+			} else {
+				this.recordLength = this.lengthBuffer.getInt(0);
+
+				this.lengthBuffer.clear();
+				segmentPosition += toPut;
+				segmentRemaining -= toPut;
+				if (this.recordLength > THRESHOLD_FOR_SPILLING) {
+					this.spillingChannel = createSpillingChannel();
+				} else {
+					ensureBufferCapacity(this.recordLength);
+				}
+			}
+		}
+
+		// copy as much as we need or can for this next spanning record
+		int needed = this.recordLength - this.accumulatedRecordBytes;
+		int toCopy = Math.min(needed, segmentRemaining);
+
+		if (spillingChannel != null) {
+			// spill to file
+			ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
+			FileUtils.writeCompletely(this.spillingChannel, toWrite);
+		} else {
+			segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
+		}
+
+		this.accumulatedRecordBytes += toCopy;
+
+		if (toCopy < segmentRemaining) {
+			// there is more data in the segment
+			this.leftOverData = segment;
+			this.leftOverStart = segmentPosition + toCopy;
+			this.leftOverLimit = numBytes + offset;
+		}
+
+		if (accumulatedRecordBytes == recordLength) {
+			// we have the full record
+			if (spillingChannel == null) {
+				this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
+			}
+			else {
+				spillingChannel.close();
+
+				BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
+				this.spillFileReader = new DataInputViewStreamWrapper(inStream);
+			}
+		}
+	}
+
+	Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+		// for the case of only partial length, no data
+		final int position = lengthBuffer.position();
+		if (position > 0) {
+			MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
+			lengthBuffer.position(0);
+			segment.put(0, lengthBuffer, position);
+			return Optional.of(segment);
+		}
+
+		// for the case of full length, partial data in buffer
+		if (recordLength > THRESHOLD_FOR_SPILLING) {
+			throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled " +
+				"records.");
+		} else if (recordLength != -1) {
+			int leftOverSize = leftOverLimit - leftOverStart;
+			int unconsumedSize = Integer.BYTES + accumulatedRecordBytes + leftOverSize;
+			DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
+			serializer.writeInt(recordLength);
+			serializer.write(buffer, 0, accumulatedRecordBytes);
+			if (leftOverData != null) {
+				serializer.write(leftOverData, leftOverStart, leftOverSize);
+			}
+			MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+			segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
+			return Optional.of(segment);
+		}
+
+		// for the case of no remaining partial length or data
+		return Optional.empty();
+	}
+
+	void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
+		deserializer.clear();
+
+		if (leftOverData != null) {
+			deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
+		}
+	}
+
+	boolean hasFullRecord() {
+		return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
+	}
+
+	int getNumGatheredBytes() {
+		return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
+	}
+
+	public void clear() {
+		this.buffer = initialBuffer;
+		this.serializationReadBuffer.releaseArrays();
+
+		this.recordLength = -1;
+		this.lengthBuffer.clear();
+		this.leftOverData = null;
+		this.leftOverStart = 0;
+		this.leftOverLimit = 0;
+		this.accumulatedRecordBytes = 0;
+
+		if (spillingChannel != null) {
+			try {
+				spillingChannel.close();
+			}
+			catch (Throwable t) {
+				// ignore
+			}
+			spillingChannel = null;
+		}
+		if (spillFileReader != null) {
+			try {
+				spillFileReader.close();
+			}
+			catch (Throwable t) {
+				// ignore
+			}
+			spillFileReader = null;
+		}
+		if (spillFile != null) {
+			spillFile.delete();
+			spillFile = null;
+		}
+	}
+
+	public DataInputView getInputView() {
+		if (spillFileReader == null) {
+			return serializationReadBuffer;
+		}
+		else {
+			return spillFileReader;
+		}
+	}
+
+	private void ensureBufferCapacity(int minLength) {
+		if (buffer.length < minLength) {
+			byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
+			System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes);
+			buffer = newBuffer;
+		}
+	}
+
+	@SuppressWarnings("resource")
+	private FileChannel createSpillingChannel() throws IOException {
+		if (spillFile != null) {
+			throw new IllegalStateException("Spilling file already exists.");
+		}
+
+		// try to find a unique file name for the spilling channel
+		int maxAttempts = 10;
+		for (int attempt = 0; attempt < maxAttempts; attempt++) {
+			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
+			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
+			if (spillFile.createNewFile()) {
+				return new RandomAccessFile(spillFile, "rw").getChannel();
+			}
+		}
+
+		throw new IOException(
+			"Could not find a unique file channel name in '" + Arrays.toString(tempDirs) +
+				"' for spilling large records during deserialization.");
+	}
+
+	private static String randomString(Random random) {
+		final byte[] bytes = new byte[20];
+		random.nextBytes(bytes);
+		return StringUtils.byteToHexString(bytes);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 5003e78..f20fbc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -19,29 +19,13 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.StringUtils;
 
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
 import java.util.Optional;
-import java.util.Random;
 
 /**
  * @param <T> The type of the record to be deserialized.
@@ -54,8 +38,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					"(Value or Writable), check their serialization methods. If you are using a " +
 					"Kryo-serialized type, check the corresponding Kryo serializer.";
 
-	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
-
 	private final NonSpanningWrapper nonSpanningWrapper;
 
 	private final SpanningWrapper spanningWrapper;
@@ -179,264 +161,4 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	private static final class SpanningWrapper {
-
-		private final byte[] initialBuffer = new byte[1024];
-
-		private final String[] tempDirs;
-
-		private final Random rnd = new Random();
-
-		private final DataInputDeserializer serializationReadBuffer;
-
-		private final ByteBuffer lengthBuffer;
-
-		private FileChannel spillingChannel;
-
-		private byte[] buffer;
-
-		private int recordLength;
-
-		private int accumulatedRecordBytes;
-
-		private MemorySegment leftOverData;
-
-		private int leftOverStart;
-
-		private int leftOverLimit;
-
-		private File spillFile;
-
-		private DataInputViewStreamWrapper spillFileReader;
-
-		public SpanningWrapper(String[] tempDirs) {
-			this.tempDirs = tempDirs;
-
-			this.lengthBuffer = ByteBuffer.allocate(4);
-			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
-			this.recordLength = -1;
-
-			this.serializationReadBuffer = new DataInputDeserializer();
-			this.buffer = initialBuffer;
-		}
-
-		private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
-			// set the length and copy what is available to the buffer
-			this.recordLength = nextRecordLength;
-
-			final int numBytesChunk = partial.remaining();
-
-			if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-				// create a spilling channel and put the data there
-				this.spillingChannel = createSpillingChannel();
-
-				ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
-				FileUtils.writeCompletely(this.spillingChannel, toWrite);
-			}
-			else {
-				// collect in memory
-				ensureBufferCapacity(nextRecordLength);
-				partial.segment.get(partial.position, buffer, 0, numBytesChunk);
-			}
-
-			this.accumulatedRecordBytes = numBytesChunk;
-		}
-
-		private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
-			// copy what we have to the length buffer
-			partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
-		}
-
-		private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
-			int segmentPosition = offset;
-			int segmentRemaining = numBytes;
-			// check where to go. if we have a partial length, we need to complete it first
-			if (this.lengthBuffer.position() > 0) {
-				int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
-				segment.get(segmentPosition, this.lengthBuffer, toPut);
-				// did we complete the length?
-				if (this.lengthBuffer.hasRemaining()) {
-					return;
-				} else {
-					this.recordLength = this.lengthBuffer.getInt(0);
-
-					this.lengthBuffer.clear();
-					segmentPosition += toPut;
-					segmentRemaining -= toPut;
-					if (this.recordLength > THRESHOLD_FOR_SPILLING) {
-						this.spillingChannel = createSpillingChannel();
-					} else {
-						ensureBufferCapacity(this.recordLength);
-					}
-				}
-			}
-
-			// copy as much as we need or can for this next spanning record
-			int needed = this.recordLength - this.accumulatedRecordBytes;
-			int toCopy = Math.min(needed, segmentRemaining);
-
-			if (spillingChannel != null) {
-				// spill to file
-				ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
-				FileUtils.writeCompletely(this.spillingChannel, toWrite);
-			} else {
-				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
-			}
-
-			this.accumulatedRecordBytes += toCopy;
-
-			if (toCopy < segmentRemaining) {
-				// there is more data in the segment
-				this.leftOverData = segment;
-				this.leftOverStart = segmentPosition + toCopy;
-				this.leftOverLimit = numBytes + offset;
-			}
-
-			if (accumulatedRecordBytes == recordLength) {
-				// we have the full record
-				if (spillingChannel == null) {
-					this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
-				}
-				else {
-					spillingChannel.close();
-
-					BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
-					this.spillFileReader = new DataInputViewStreamWrapper(inStream);
-				}
-			}
-		}
-
-		Optional<MemorySegment> getUnconsumedSegment() throws IOException {
-			// for the case of only partial length, no data
-			final int position = lengthBuffer.position();
-			if (position > 0) {
-				MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position);
-				lengthBuffer.position(0);
-				segment.put(0, lengthBuffer, position);
-				return Optional.of(segment);
-			}
-
-			// for the case of full length, partial data in buffer
-			if (recordLength > THRESHOLD_FOR_SPILLING) {
-				throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled " +
-					"records.");
-			} else if (recordLength != -1) {
-				int leftOverSize = leftOverLimit - leftOverStart;
-				int unconsumedSize = Integer.BYTES + accumulatedRecordBytes + leftOverSize;
-				DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize);
-				serializer.writeInt(recordLength);
-				serializer.write(buffer, 0, accumulatedRecordBytes);
-				if (leftOverData != null) {
-					serializer.write(leftOverData, leftOverStart, leftOverSize);
-				}
-				MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
-				segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
-				return Optional.of(segment);
-			}
-
-			// for the case of no remaining partial length or data
-			return Optional.empty();
-		}
-
-		private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
-			deserializer.clear();
-
-			if (leftOverData != null) {
-				deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
-			}
-		}
-
-		private boolean hasFullRecord() {
-			return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
-		}
-
-		private int getNumGatheredBytes() {
-			return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
-		}
-
-		public void clear() {
-			this.buffer = initialBuffer;
-			this.serializationReadBuffer.releaseArrays();
-
-			this.recordLength = -1;
-			this.lengthBuffer.clear();
-			this.leftOverData = null;
-			this.leftOverStart = 0;
-			this.leftOverLimit = 0;
-			this.accumulatedRecordBytes = 0;
-
-			if (spillingChannel != null) {
-				try {
-					spillingChannel.close();
-				}
-				catch (Throwable t) {
-					// ignore
-				}
-				spillingChannel = null;
-			}
-			if (spillFileReader != null) {
-				try {
-					spillFileReader.close();
-				}
-				catch (Throwable t) {
-					// ignore
-				}
-				spillFileReader = null;
-			}
-			if (spillFile != null) {
-				spillFile.delete();
-				spillFile = null;
-			}
-		}
-
-		public DataInputView getInputView() {
-			if (spillFileReader == null) {
-				return serializationReadBuffer;
-			}
-			else {
-				return spillFileReader;
-			}
-		}
-
-		private void ensureBufferCapacity(int minLength) {
-			if (buffer.length < minLength) {
-				byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
-				System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes);
-				buffer = newBuffer;
-			}
-		}
-
-		@SuppressWarnings("resource")
-		private FileChannel createSpillingChannel() throws IOException {
-			if (spillFile != null) {
-				throw new IllegalStateException("Spilling file already exists.");
-			}
-
-			// try to find a unique file name for the spilling channel
-			int maxAttempts = 10;
-			for (int attempt = 0; attempt < maxAttempts; attempt++) {
-				String directory = tempDirs[rnd.nextInt(tempDirs.length)];
-				spillFile = new File(directory, randomString(rnd) + ".inputchannel");
-				if (spillFile.createNewFile()) {
-					return new RandomAccessFile(spillFile, "rw").getChannel();
-				}
-			}
-
-			throw new IOException(
-				"Could not find a unique file channel name in '" + Arrays.toString(tempDirs) +
-					"' for spilling large records during deserialization.");
-		}
-
-		private static String randomString(Random random) {
-			final byte[] bytes = new byte[20];
-			random.nextBytes(bytes);
-			return StringUtils.byteToHexString(bytes);
-		}
-	}
 }


[flink] 08/10: [FLINK-17547][task][hotfix] Move RefCountedFile to flink-core to use it in SpanningWrapper

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1379548cf96b1f59aff21d7797390b805a599c08
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 15:31:51 2020 +0200

    [FLINK-17547][task][hotfix] Move RefCountedFile to flink-core
    to use it in SpanningWrapper
---
 .../src/main/java/org/apache/flink/core/fs}/RefCountedFile.java       | 4 ++--
 .../src/main/java/org/apache/flink/util}/RefCounted.java              | 2 +-
 .../src/test/java/org/apache/flink/core/fs}/RefCountedFileTest.java   | 2 +-
 .../org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java | 1 +
 .../org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java | 1 +
 5 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
similarity index 96%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
rename to flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
index 9675f09..7cbc47f 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -43,7 +43,7 @@ public class RefCountedFile implements RefCounted {
 
 	protected boolean closed;
 
-	protected RefCountedFile(final File file) {
+	public RefCountedFile(final File file) {
 		this.file = checkNotNull(file);
 		this.references = new AtomicInteger(1);
 		this.closed = false;
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCounted.java b/flink-core/src/main/java/org/apache/flink/util/RefCounted.java
similarity index 96%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCounted.java
rename to flink-core/src/main/java/org/apache/flink/util/RefCounted.java
index 84b0fa0..33496a0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCounted.java
+++ b/flink-core/src/main/java/org/apache/flink/util/RefCounted.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/RefCountedFileTest.java
similarity index 98%
rename from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
rename to flink-core/src/test/java/org/apache/flink/core/fs/RefCountedFileTest.java
index 217f4e1..58ca29a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/RefCountedFileTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
 
 import org.junit.Assert;
 import org.junit.Rule;
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
index d51e37e..a36175d 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3.common.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.util.RefCounted;
 
 import java.io.File;
 import java.io.IOException;
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
index 94b8527..bcb0057 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
@@ -19,6 +19,7 @@
 package org.apache.flink.fs.s3.common.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RefCountedFile;
 import org.apache.flink.util.IOUtils;
 
 import java.io.File;


[flink] 02/10: [FLINK-17547][task][hotfix] Extract NonSpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static inner class) As it is, no logical changes.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39f5f1b0f09c37400ba113fdf33f90a832de5f0d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed May 6 17:54:05 2020 +0200

    [FLINK-17547][task][hotfix] Extract NonSpanningWrapper from
    SpillingAdaptiveSpanningRecordDeserializer (static inner class)
    As it is, no logical changes.
---
 .../api/serialization/NonSpanningWrapper.java      | 296 +++++++++++++++++++++
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 271 -------------------
 2 files changed, 296 insertions(+), 271 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
new file mode 100644
index 0000000..bab50fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.Optional;
+
+final class NonSpanningWrapper implements DataInputView {
+
+	MemorySegment segment;
+
+	private int limit;
+
+	int position;
+
+	private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
+	private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
+
+	int remaining() {
+		return this.limit - this.position;
+	}
+
+	void clear() {
+		this.segment = null;
+		this.limit = 0;
+		this.position = 0;
+	}
+
+	void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+		this.segment = seg;
+		this.position = position;
+		this.limit = leftOverLimit;
+	}
+
+	Optional<MemorySegment> getUnconsumedSegment() {
+		if (remaining() == 0) {
+			return Optional.empty();
+		}
+		MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
+		segment.copyTo(position, target, 0, remaining());
+		return Optional.of(target);
+	}
+
+	// -------------------------------------------------------------------------------------------------------------
+	//                                       DataInput specific methods
+	// -------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public final void readFully(byte[] b) throws IOException {
+		readFully(b, 0, b.length);
+	}
+
+	@Override
+	public final void readFully(byte[] b, int off, int len) throws IOException {
+		if (off < 0 || len < 0 || off + len > b.length) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		this.segment.get(this.position, b, off, len);
+		this.position += len;
+	}
+
+	@Override
+	public final boolean readBoolean() throws IOException {
+		return readByte() == 1;
+	}
+
+	@Override
+	public final byte readByte() throws IOException {
+		return this.segment.get(this.position++);
+	}
+
+	@Override
+	public final int readUnsignedByte() throws IOException {
+		return readByte() & 0xff;
+	}
+
+	@Override
+	public final short readShort() throws IOException {
+		final short v = this.segment.getShortBigEndian(this.position);
+		this.position += 2;
+		return v;
+	}
+
+	@Override
+	public final int readUnsignedShort() throws IOException {
+		final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
+		this.position += 2;
+		return v;
+	}
+
+	@Override
+	public final char readChar() throws IOException  {
+		final char v = this.segment.getCharBigEndian(this.position);
+		this.position += 2;
+		return v;
+	}
+
+	@Override
+	public final int readInt() throws IOException {
+		final int v = this.segment.getIntBigEndian(this.position);
+		this.position += 4;
+		return v;
+	}
+
+	@Override
+	public final long readLong() throws IOException {
+		final long v = this.segment.getLongBigEndian(this.position);
+		this.position += 8;
+		return v;
+	}
+
+	@Override
+	public final float readFloat() throws IOException {
+		return Float.intBitsToFloat(readInt());
+	}
+
+	@Override
+	public final double readDouble() throws IOException {
+		return Double.longBitsToDouble(readLong());
+	}
+
+	@Override
+	public final String readLine() throws IOException {
+		final StringBuilder bld = new StringBuilder(32);
+
+		try {
+			int b;
+			while ((b = readUnsignedByte()) != '\n') {
+				if (b != '\r') {
+					bld.append((char) b);
+				}
+			}
+		}
+		catch (EOFException ignored) {}
+
+		if (bld.length() == 0) {
+			return null;
+		}
+
+		// trim a trailing carriage return
+		int len = bld.length();
+		if (len > 0 && bld.charAt(len - 1) == '\r') {
+			bld.setLength(len - 1);
+		}
+		return bld.toString();
+	}
+
+	@Override
+	public final String readUTF() throws IOException {
+		final int utflen = readUnsignedShort();
+
+		final byte[] bytearr;
+		final char[] chararr;
+
+		if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
+			bytearr = new byte[utflen];
+			this.utfByteBuffer = bytearr;
+		} else {
+			bytearr = this.utfByteBuffer;
+		}
+		if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
+			chararr = new char[utflen];
+			this.utfCharBuffer = chararr;
+		} else {
+			chararr = this.utfCharBuffer;
+		}
+
+		int c, char2, char3;
+		int count = 0;
+		int chararrCount = 0;
+
+		readFully(bytearr, 0, utflen);
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			if (c > 127) {
+				break;
+			}
+			count++;
+			chararr[chararrCount++] = (char) c;
+		}
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			switch (c >> 4) {
+			case 0:
+			case 1:
+			case 2:
+			case 3:
+			case 4:
+			case 5:
+			case 6:
+			case 7:
+				count++;
+				chararr[chararrCount++] = (char) c;
+				break;
+			case 12:
+			case 13:
+				count += 2;
+				if (count > utflen) {
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
+				char2 = (int) bytearr[count - 1];
+				if ((char2 & 0xC0) != 0x80) {
+					throw new UTFDataFormatException("malformed input around byte " + count);
+				}
+				chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+				break;
+			case 14:
+				count += 3;
+				if (count > utflen) {
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
+				char2 = (int) bytearr[count - 2];
+				char3 = (int) bytearr[count - 1];
+				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+				}
+				chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+				break;
+			default:
+				throw new UTFDataFormatException("malformed input around byte " + count);
+			}
+		}
+		// The number of chars produced may be less than utflen
+		return new String(chararr, 0, chararrCount);
+	}
+
+	@Override
+	public final int skipBytes(int n) throws IOException {
+		if (n < 0) {
+			throw new IllegalArgumentException();
+		}
+
+		int toSkip = Math.min(n, remaining());
+		this.position += toSkip;
+		return toSkip;
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		int skippedBytes = skipBytes(numBytes);
+
+		if (skippedBytes < numBytes){
+			throw new EOFException("Could not skip " + numBytes + " bytes.");
+		}
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		if (b == null){
+			throw new NullPointerException("Byte array b cannot be null.");
+		}
+
+		if (off < 0){
+			throw new IllegalArgumentException("The offset off cannot be negative.");
+		}
+
+		if (len < 0){
+			throw new IllegalArgumentException("The length len cannot be negative.");
+		}
+
+		int toRead = Math.min(len, remaining());
+		this.segment.get(this.position, b, off, toRead);
+		this.position += toRead;
+
+		return toRead;
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return read(b, 0, b.length);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 346bdfc..5003e78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -32,12 +32,10 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
@@ -184,275 +182,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	// -----------------------------------------------------------------------------------------------------------------
 
-	private static final class NonSpanningWrapper implements DataInputView {
-
-		private MemorySegment segment;
-
-		private int limit;
-
-		private int position;
-
-		private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
-		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-
-		int remaining() {
-			return this.limit - this.position;
-		}
-
-		void clear() {
-			this.segment = null;
-			this.limit = 0;
-			this.position = 0;
-		}
-
-		void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
-			this.segment = seg;
-			this.position = position;
-			this.limit = leftOverLimit;
-		}
-
-		Optional<MemorySegment> getUnconsumedSegment() {
-			if (remaining() == 0) {
-				return Optional.empty();
-			}
-			MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
-			segment.copyTo(position, target, 0, remaining());
-			return Optional.of(target);
-		}
-
-		// -------------------------------------------------------------------------------------------------------------
-		//                                       DataInput specific methods
-		// -------------------------------------------------------------------------------------------------------------
-
-		@Override
-		public final void readFully(byte[] b) throws IOException {
-			readFully(b, 0, b.length);
-		}
-
-		@Override
-		public final void readFully(byte[] b, int off, int len) throws IOException {
-			if (off < 0 || len < 0 || off + len > b.length) {
-				throw new IndexOutOfBoundsException();
-			}
-
-			this.segment.get(this.position, b, off, len);
-			this.position += len;
-		}
-
-		@Override
-		public final boolean readBoolean() throws IOException {
-			return readByte() == 1;
-		}
-
-		@Override
-		public final byte readByte() throws IOException {
-			return this.segment.get(this.position++);
-		}
-
-		@Override
-		public final int readUnsignedByte() throws IOException {
-			return readByte() & 0xff;
-		}
-
-		@Override
-		public final short readShort() throws IOException {
-			final short v = this.segment.getShortBigEndian(this.position);
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final int readUnsignedShort() throws IOException {
-			final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final char readChar() throws IOException  {
-			final char v = this.segment.getCharBigEndian(this.position);
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final int readInt() throws IOException {
-			final int v = this.segment.getIntBigEndian(this.position);
-			this.position += 4;
-			return v;
-		}
-
-		@Override
-		public final long readLong() throws IOException {
-			final long v = this.segment.getLongBigEndian(this.position);
-			this.position += 8;
-			return v;
-		}
-
-		@Override
-		public final float readFloat() throws IOException {
-			return Float.intBitsToFloat(readInt());
-		}
-
-		@Override
-		public final double readDouble() throws IOException {
-			return Double.longBitsToDouble(readLong());
-		}
-
-		@Override
-		public final String readLine() throws IOException {
-			final StringBuilder bld = new StringBuilder(32);
-
-			try {
-				int b;
-				while ((b = readUnsignedByte()) != '\n') {
-					if (b != '\r') {
-						bld.append((char) b);
-					}
-				}
-			}
-			catch (EOFException ignored) {}
-
-			if (bld.length() == 0) {
-				return null;
-			}
-
-			// trim a trailing carriage return
-			int len = bld.length();
-			if (len > 0 && bld.charAt(len - 1) == '\r') {
-				bld.setLength(len - 1);
-			}
-			return bld.toString();
-		}
-
-		@Override
-		public final String readUTF() throws IOException {
-			final int utflen = readUnsignedShort();
-
-			final byte[] bytearr;
-			final char[] chararr;
-
-			if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
-				bytearr = new byte[utflen];
-				this.utfByteBuffer = bytearr;
-			} else {
-				bytearr = this.utfByteBuffer;
-			}
-			if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
-				chararr = new char[utflen];
-				this.utfCharBuffer = chararr;
-			} else {
-				chararr = this.utfCharBuffer;
-			}
-
-			int c, char2, char3;
-			int count = 0;
-			int chararrCount = 0;
-
-			readFully(bytearr, 0, utflen);
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				if (c > 127) {
-					break;
-				}
-				count++;
-				chararr[chararrCount++] = (char) c;
-			}
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				switch (c >> 4) {
-				case 0:
-				case 1:
-				case 2:
-				case 3:
-				case 4:
-				case 5:
-				case 6:
-				case 7:
-					count++;
-					chararr[chararrCount++] = (char) c;
-					break;
-				case 12:
-				case 13:
-					count += 2;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 1];
-					if ((char2 & 0xC0) != 0x80) {
-						throw new UTFDataFormatException("malformed input around byte " + count);
-					}
-					chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-					break;
-				case 14:
-					count += 3;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 2];
-					char3 = (int) bytearr[count - 1];
-					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-					}
-					chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
-					break;
-				default:
-					throw new UTFDataFormatException("malformed input around byte " + count);
-				}
-			}
-			// The number of chars produced may be less than utflen
-			return new String(chararr, 0, chararrCount);
-		}
-
-		@Override
-		public final int skipBytes(int n) throws IOException {
-			if (n < 0) {
-				throw new IllegalArgumentException();
-			}
-
-			int toSkip = Math.min(n, remaining());
-			this.position += toSkip;
-			return toSkip;
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			int skippedBytes = skipBytes(numBytes);
-
-			if (skippedBytes < numBytes){
-				throw new EOFException("Could not skip " + numBytes + " bytes.");
-			}
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			if (b == null){
-				throw new NullPointerException("Byte array b cannot be null.");
-			}
-
-			if (off < 0){
-				throw new IllegalArgumentException("The offset off cannot be negative.");
-			}
-
-			if (len < 0){
-				throw new IllegalArgumentException("The length len cannot be negative.");
-			}
-
-			int toRead = Math.min(len, remaining());
-			this.segment.get(this.position, b, off, toRead);
-			this.position += toRead;
-
-			return toRead;
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return read(b, 0, b.length);
-		}
-	}
-
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private static final class SpanningWrapper {


[flink] 04/10: [FLINK-17547][task][hotfix] Fix compiler warnings in NonSpanningWrapper

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6bdeb4b5b4c608510c98a91e19f5facdad28ed0
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue May 12 11:23:39 2020 +0200

    [FLINK-17547][task][hotfix] Fix compiler warnings in NonSpanningWrapper
---
 .../api/serialization/NonSpanningWrapper.java      | 53 ++++++++++------------
 1 file changed, 25 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index bab50fa..6d9602f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -67,12 +67,12 @@ final class NonSpanningWrapper implements DataInputView {
 	// -------------------------------------------------------------------------------------------------------------
 
 	@Override
-	public final void readFully(byte[] b) throws IOException {
+	public final void readFully(byte[] b) {
 		readFully(b, 0, b.length);
 	}
 
 	@Override
-	public final void readFully(byte[] b, int off, int len) throws IOException {
+	public final void readFully(byte[] b, int off, int len) {
 		if (off < 0 || len < 0 || off + len > b.length) {
 			throw new IndexOutOfBoundsException();
 		}
@@ -82,78 +82,75 @@ final class NonSpanningWrapper implements DataInputView {
 	}
 
 	@Override
-	public final boolean readBoolean() throws IOException {
+	public final boolean readBoolean() {
 		return readByte() == 1;
 	}
 
 	@Override
-	public final byte readByte() throws IOException {
+	public final byte readByte() {
 		return this.segment.get(this.position++);
 	}
 
 	@Override
-	public final int readUnsignedByte() throws IOException {
+	public final int readUnsignedByte() {
 		return readByte() & 0xff;
 	}
 
 	@Override
-	public final short readShort() throws IOException {
+	public final short readShort() {
 		final short v = this.segment.getShortBigEndian(this.position);
 		this.position += 2;
 		return v;
 	}
 
 	@Override
-	public final int readUnsignedShort() throws IOException {
+	public final int readUnsignedShort() {
 		final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
 		this.position += 2;
 		return v;
 	}
 
 	@Override
-	public final char readChar() throws IOException  {
+	public final char readChar()  {
 		final char v = this.segment.getCharBigEndian(this.position);
 		this.position += 2;
 		return v;
 	}
 
 	@Override
-	public final int readInt() throws IOException {
+	public final int readInt() {
 		final int v = this.segment.getIntBigEndian(this.position);
 		this.position += 4;
 		return v;
 	}
 
 	@Override
-	public final long readLong() throws IOException {
+	public final long readLong() {
 		final long v = this.segment.getLongBigEndian(this.position);
 		this.position += 8;
 		return v;
 	}
 
 	@Override
-	public final float readFloat() throws IOException {
+	public final float readFloat() {
 		return Float.intBitsToFloat(readInt());
 	}
 
 	@Override
-	public final double readDouble() throws IOException {
+	public final double readDouble() {
 		return Double.longBitsToDouble(readLong());
 	}
 
 	@Override
-	public final String readLine() throws IOException {
+	public final String readLine() {
 		final StringBuilder bld = new StringBuilder(32);
 
-		try {
-			int b;
-			while ((b = readUnsignedByte()) != '\n') {
-				if (b != '\r') {
-					bld.append((char) b);
-				}
+		int b;
+		while ((b = readUnsignedByte()) != '\n') {
+			if (b != '\r') {
+				bld.append((char) b);
 			}
 		}
-		catch (EOFException ignored) {}
 
 		if (bld.length() == 0) {
 			return null;
@@ -168,7 +165,7 @@ final class NonSpanningWrapper implements DataInputView {
 	}
 
 	@Override
-	public final String readUTF() throws IOException {
+	public final String readUTF() throws UTFDataFormatException {
 		final int utflen = readUnsignedShort();
 
 		final byte[] bytearr;
@@ -222,7 +219,7 @@ final class NonSpanningWrapper implements DataInputView {
 				if (count > utflen) {
 					throw new UTFDataFormatException("malformed input: partial character at end");
 				}
-				char2 = (int) bytearr[count - 1];
+				char2 = bytearr[count - 1];
 				if ((char2 & 0xC0) != 0x80) {
 					throw new UTFDataFormatException("malformed input around byte " + count);
 				}
@@ -233,8 +230,8 @@ final class NonSpanningWrapper implements DataInputView {
 				if (count > utflen) {
 					throw new UTFDataFormatException("malformed input: partial character at end");
 				}
-				char2 = (int) bytearr[count - 2];
-				char3 = (int) bytearr[count - 1];
+				char2 = bytearr[count - 2];
+				char3 = bytearr[count - 1];
 				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 				}
@@ -249,7 +246,7 @@ final class NonSpanningWrapper implements DataInputView {
 	}
 
 	@Override
-	public final int skipBytes(int n) throws IOException {
+	public final int skipBytes(int n) {
 		if (n < 0) {
 			throw new IllegalArgumentException();
 		}
@@ -260,7 +257,7 @@ final class NonSpanningWrapper implements DataInputView {
 	}
 
 	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
+	public void skipBytesToRead(int numBytes) throws EOFException {
 		int skippedBytes = skipBytes(numBytes);
 
 		if (skippedBytes < numBytes){
@@ -269,7 +266,7 @@ final class NonSpanningWrapper implements DataInputView {
 	}
 
 	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
+	public int read(byte[] b, int off, int len) {
 		if (b == null){
 			throw new NullPointerException("Byte array b cannot be null.");
 		}
@@ -290,7 +287,7 @@ final class NonSpanningWrapper implements DataInputView {
 	}
 
 	@Override
-	public int read(byte[] b) throws IOException {
+	public int read(byte[] b) {
 		return read(b, 0, b.length);
 	}
 }