You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/10 23:34:13 UTC

[flink] 14/14: [FLINK-12070] [network] Change Bounded Blocking Subpartition Implementation to Memory Mapped Files

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7cf24383be9f310fb5ccc5a032721421fa45791
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Apr 25 13:09:40 2019 +0200

    [FLINK-12070] [network] Change Bounded Blocking Subpartition Implementation to Memory Mapped Files
    
    This commit consists of multiple steps (originally individual commits) that are squashed into
    one commit after review, to form a self-contained (compiling / test passing) commit.
    
    Part 1: Make tests that were specific to pipelined (and buffer storing) implementations into the
        specific test classes.
    
        Several assumptions are tied to specific implementations of the partitions, rather than testing
        required behavior:
    
          - The availability of statistics after disposing partitions is not necessary and requires extra
            effort to guarantee in certain implementations
          - The fact that the number of bytes in a partition only update on consumption seems wrong and
            can only apply on "consume once" implementations. This should not be assumed in a test base.
    
    Part 2: Make "release parent releases readers" test specific to pipelined partitions.
    
        For pipelined partitions, the release call on the SubPartition causes immedately releasing the reader (view).
        For bounded partitions, this is not required or even desirable, because too eager release can segfault in case
        of direct byte buffers and memory mapped files.
    
    Part 3: Remove old SpillableSubpartition and SpillableSubpartitionView
    
    Part 4: Move code specific to pipelined subpartitions into PipelinedSubpartition class.
    
    Part 5: Implement new BoundedBlockingSubpartition
    
    Part 6: Remove no longer applicable memory release test for blocking partitions
    
    Part 7: Add tests for BoundedBlockingSubpartition
---
 .../flink/core/memory/MemorySegmentFactory.java    |  14 +
 .../runtime/io/network/buffer/BufferConsumer.java  |  11 +-
 .../partition/BoundedBlockingSubpartition.java     | 274 +++++++
 .../BoundedBlockingSubpartitionReader.java         | 144 ++++
 .../io/network/partition/BufferToByteBuffer.java   | 130 ++++
 .../io/network/partition/MemoryMappedBuffers.java  | 279 +++++++
 .../runtime/io/network/partition/PageSizeUtil.java | 113 +++
 .../network/partition/PipelinedSubpartition.java   |  70 ++
 .../io/network/partition/ResultPartition.java      |  38 +-
 .../io/network/partition/ResultSubpartition.java   |  83 +--
 .../network/partition/SpillableSubpartition.java   | 312 --------
 .../partition/SpillableSubpartitionView.java       | 280 --------
 .../network/partition/SpilledSubpartitionView.java | 303 --------
 .../io/network/buffer/BufferBuilderTestUtils.java  |  29 +
 .../partition/BoundedBlockingSubpartitionTest.java | 105 +++
 .../BoundedBlockingSubpartitionWriteReadTest.java  | 199 +++++
 .../network/partition/BufferToByteBufferTest.java  |  80 +++
 .../network/partition/MemoryMappedBuffersTest.java | 162 +++++
 .../partition/PipelinedSubpartitionTest.java       |  45 ++
 .../io/network/partition/ResultPartitionTest.java  |   5 -
 .../partition/SpillableSubpartitionTest.java       | 800 ---------------------
 .../io/network/partition/SubpartitionTestBase.java | 109 ++-
 22 files changed, 1778 insertions(+), 1807 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 48b9a20..9a20e0e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -116,4 +116,18 @@ public final class MemorySegmentFactory {
 		return new HybridMemorySegment(memory, owner);
 	}
 
+	/**
+	 * Creates a memory segment that wraps the off-heap memory backing the given ByteBuffer.
+	 * Note that the ByteBuffer needs to be a <i>direct ByteBuffer</i>.
+	 *
+	 * <p>This method is intended to be used for components which pool memory and create
+	 * memory segments around long-lived memory regions.
+	 *
+	 * @param memory The byte buffer with the off-heap memory to be represented by the memory segment.
+	 * @return A new memory segment representing the given off-heap memory.
+	 */
+	public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
+		return new HybridMemorySegment(memory);
+	}
+
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index affcc74..b58a627 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -61,9 +61,16 @@ public class BufferConsumer implements Closeable {
 	 * Constructs {@link BufferConsumer} instance with static content.
 	 */
 	public BufferConsumer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
+		this(memorySegment, recycler, memorySegment.size(), isBuffer);
+	}
+
+	/**
+	 * Constructs {@link BufferConsumer} instance with static content of a certain size.
+	 */
+	public BufferConsumer(MemorySegment memorySegment, BufferRecycler recycler, int size, boolean isBuffer) {
 		this(new NetworkBuffer(checkNotNull(memorySegment), checkNotNull(recycler), isBuffer),
-			() -> -memorySegment.size(),
-			0);
+				() -> -size,
+				0);
 		checkState(memorySegment.size() > 0);
 		checkState(isFinished(), "BufferConsumer with static size must be finished after construction!");
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
new file mode 100644
index 0000000..76c7a2d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the ResultSubpartition for a bounded result transferred
+ * in a blocking manner: The result is first produced, then consumed.
+ * The result can be consumed possibly multiple times.
+ *
+ * <p>The implementation creates a temporary memory mapped file and writes all buffers to that
+ * memory and serves the result from that memory. The kernel backs the mapped memory region
+ * with physical memory and file space incrementally as new pages are filled.
+ *
+ * <h2>Important Notes on Thread Safety</h2>
+ *
+ * <p>This class does not synchronize every buffer access. It assumes the threading model of the
+ * Flink network stack and is not thread-safe beyond that.
+ *
+ * <p>This class assumes a single writer thread that adds buffers, flushes, and finishes the write
+ * phase. That same thread is also assumed to perform the partition release, if the release happens
+ * during the write phase.
+ *
+ * <p>The implementation supports multiple concurrent readers, but assumes a single
+ * thread per reader. That same thread must also release the reader. In particular, after the reader
+ * was released, no buffers obtained from this reader may be accessed any more, or segmentation
+ * faults might occur.
+ *
+ * <p>The method calls to create readers, dispose readers, and dispose the partition are
+ * thread-safe vis-a-vis each other.
+ */
+final class BoundedBlockingSubpartition extends ResultSubpartition {
+
+	/** This lock guards the creation of readers and disposal of the memory mapped file. */
+	private final Object lock = new Object();
+
+	/** The current buffer, may be filled further over time. */
+	@Nullable
+	private BufferConsumer currentBuffer;
+
+	/** The memory that we store the data in, via a memory mapped file. */
+	private final MemoryMappedBuffers memory;
+
+	/** All created and not yet released readers. */
+	@GuardedBy("lock")
+	private final Set<BoundedBlockingSubpartitionReader> readers;
+
+	/** Counter for the number of data buffers (not events!) written. */
+	private int numDataBuffersWritten;
+
+	/** The counter for the number of data buffers and events. */
+	private int numBuffersAndEventsWritten;
+
+	/** Flag indicating whether the writing has finished and this is now available for read. */
+	private boolean isFinished;
+
+	/** Flag indicating whether the subpartition has been released. */
+	private boolean isReleased;
+
+	/**
+	 * Common constructor.
+	 */
+	public BoundedBlockingSubpartition(
+			int index,
+			ResultPartition parent,
+			Path filePath) throws IOException {
+
+		this(index, parent, MemoryMappedBuffers.create(filePath));
+	}
+
+	/**
+	 * Constructor for testing, to pass in custom MemoryMappedBuffers.
+	 */
+	@VisibleForTesting
+	BoundedBlockingSubpartition(
+			int index,
+			ResultPartition parent,
+			MemoryMappedBuffers memory) throws IOException {
+
+		super(index, parent);
+
+		this.memory = checkNotNull(memory);
+		this.readers = new HashSet<>();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if writing is finished.
+	 * Readers cannot be created until writing is finished, and no further writes can happen after that.
+	 */
+	public boolean isFinished() {
+		return isFinished;
+	}
+
+	@Override
+	public boolean isReleased() {
+		return isReleased;
+	}
+
+	@Override
+	public boolean add(BufferConsumer bufferConsumer) throws IOException {
+		if (isFinished()) {
+			bufferConsumer.close();
+			return false;
+		}
+
+		flushCurrentBuffer();
+		currentBuffer = bufferConsumer;
+		return true;
+	}
+
+	@Override
+	public void flush() {
+		// unfortunately, the signature of flush does not allow for any exceptions, so we
+		// need to do this discouraged pattern of runtime exception wrapping
+		try {
+			flushCurrentBuffer();
+		}
+		catch (IOException e) {
+			throw new FlinkRuntimeException(e.getMessage(), e);
+		}
+	}
+
+	private void flushCurrentBuffer() throws IOException {
+		if (currentBuffer != null) {
+			writeAndCloseBufferConsumer(currentBuffer);
+			currentBuffer = null;
+		}
+	}
+
+	private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
+		try {
+			final Buffer buffer = bufferConsumer.build();
+			try {
+				memory.writeBuffer(buffer);
+
+				numBuffersAndEventsWritten++;
+				if (buffer.isBuffer()) {
+					numDataBuffersWritten++;
+				}
+			}
+			finally {
+				buffer.recycleBuffer();
+			}
+		}
+		finally {
+			bufferConsumer.close();
+		}
+	}
+
+	@Override
+	public void finish() throws IOException {
+		checkState(!isReleased, "data partition already released");
+		checkState(!isFinished, "data partition already finished");
+
+		isFinished = true;
+		flushCurrentBuffer();
+		writeAndCloseBufferConsumer(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE));
+		memory.finishWrite();
+	}
+
+	@Override
+	public void release() throws IOException {
+		synchronized (lock) {
+			if (isReleased) {
+				return;
+			}
+
+			isReleased = true;
+			isFinished = true; // for fail fast writes
+
+			checkReaderReferencesAndDispose();
+		}
+	}
+
+	@Override
+	public ResultSubpartitionView createReadView(BufferAvailabilityListener availability) throws IOException {
+		synchronized (lock) {
+			checkState(!isReleased, "data partition already released");
+			checkState(isFinished, "writing of blocking partition not yet finished");
+
+			availability.notifyDataAvailable();
+
+			final MemoryMappedBuffers.BufferSlicer memoryReader = memory.getFullBuffers();
+			final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader(
+					this, memoryReader, numDataBuffersWritten);
+			readers.add(reader);
+			return reader;
+		}
+	}
+
+	void releaseReaderReference(BoundedBlockingSubpartitionReader reader) throws IOException {
+		synchronized (lock) {
+			if (readers.remove(reader) && isReleased) {
+				checkReaderReferencesAndDispose();
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void checkReaderReferencesAndDispose() throws IOException {
+		assert Thread.holdsLock(lock);
+
+		// To avoid lingering memory mapped files (large resource footprint), we don't
+		// wait for GC to unmap the files, but use a Netty utility to directly unmap the file.
+		// To avoid segmentation faults, we need to wait until all readers have been released.
+
+		if (readers.isEmpty()) {
+			memory.close();
+		}
+	}
+
+	// ------------------------------ legacy ----------------------------------
+
+	@Override
+	public int releaseMemory() throws IOException {
+		return 0;
+	}
+
+	// ---------------------------- statistics --------------------------------
+
+	@Override
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		return 0;
+	}
+
+	@Override
+	protected long getTotalNumberOfBuffers() {
+		return numBuffersAndEventsWritten;
+	}
+
+	@Override
+	protected long getTotalNumberOfBytes() {
+		return memory.getSize();
+	}
+
+	int getBuffersInBacklog() {
+		return numDataBuffersWritten;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
new file mode 100644
index 0000000..d6c6834
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition.
+ */
+final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView {
+
+	/** The result subpartition that we read. */
+	private final BoundedBlockingSubpartition parent;
+
+	/** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */
+	@Nullable
+	private Buffer nextBuffer;
+
+	/** The reader/decoder to the memory mapped region with the data we currently read from.
+	 * Null once the reader empty or disposed.*/
+	@Nullable
+	private BufferSlicer memory;
+
+	/** The remaining number of data buffers (not events) in the result. */
+	private int dataBufferBacklog;
+
+	/** Flag whether this reader is released. Atomic, to avoid double release. */
+	private boolean isReleased;
+
+	/**
+	 * Convenience constructor that takes a single buffer.
+	 */
+	BoundedBlockingSubpartitionReader(
+			BoundedBlockingSubpartition parent,
+			BufferSlicer memory,
+			int numDataBuffers) {
+
+		checkArgument(numDataBuffers >= 0);
+
+		this.parent = checkNotNull(parent);
+		this.memory = checkNotNull(memory);
+		this.dataBufferBacklog = numDataBuffers;
+
+		this.nextBuffer = memory.sliceNextBuffer();
+	}
+
+	@Nullable
+	@Override
+	public BufferAndBacklog getNextBuffer() throws IOException {
+		final Buffer current = nextBuffer; // copy reference to stack
+
+		if (current == null) {
+			// as per contract, we must return null when the reader is empty,
+			// but also in case the reader is disposed (rather than throwing an exception)
+			return null;
+		}
+		if (current.isBuffer()) {
+			dataBufferBacklog--;
+		}
+
+		assert memory != null;
+		nextBuffer = memory.sliceNextBuffer();
+
+		return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog);
+	}
+
+	@Override
+	public void notifyDataAvailable() {
+		throw new IllegalStateException("No data should become available on a blocking partition during consumption.");
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		parent.onConsumedSubpartition();
+	}
+
+	@Override
+	public void releaseAllResources() throws IOException {
+		// it is not a problem if this method executes multiple times
+		isReleased = true;
+
+		// nulling these fields means thet read method and will fail fast
+		nextBuffer = null;
+		memory = null;
+
+		// Notify the parent that this one is released. This allows the parent to
+		// eventually release all resources (when all readers are done and the
+		// parent is disposed).
+		parent.releaseReaderReference(this);
+	}
+
+	@Override
+	public boolean isReleased() {
+		return isReleased;
+	}
+
+	@Override
+	public boolean nextBufferIsEvent() {
+		return nextBuffer != null && !nextBuffer.isBuffer();
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return nextBuffer != null;
+	}
+
+	@Override
+	public Throwable getFailureCause() {
+		// we can never throw an error after this was created
+		return null;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("Blocking Subpartition Reader: ID=%s, index=%d",
+				parent.parent.getPartitionId(),
+				parent.index);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
new file mode 100644
index 0000000..4cdf41a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Putting and getting of a sequence of buffers to/from a ByteBuffer.
+ * This class handles the headers, length encoding, memory slicing.
+ */
+final class BufferToByteBuffer {
+
+	// all fields and methods below here have package-private access to avoid bridge
+	// methods when accessing them from the nested classes
+
+	static final int HEADER_LENGTH = 8;
+
+	static final int HEADER_VALUE_IS_BUFFER = 0;
+
+	static final int HEADER_VALUE_IS_EVENT = 1;
+
+	static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) {
+		checkArgument(buffer.position() == 0);
+		checkArgument(buffer.capacity() > 8);
+		checkArgument(buffer.limit() == buffer.capacity());
+
+		return buffer.order(ByteOrder.nativeOrder());
+	}
+
+	// ------------------------------------------------------------------------
+
+	static final class Writer {
+
+		private final ByteBuffer memory;
+
+		Writer(ByteBuffer memory) {
+			this.memory = checkAndConfigureByteBuffer(memory);
+		}
+
+		public boolean writeBuffer(Buffer buffer) {
+			final ByteBuffer memory = this.memory;
+			final int bufferSize = buffer.getSize();
+
+			if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+				return false;
+			}
+
+			memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT);
+			memory.putInt(bufferSize);
+			memory.put(buffer.getNioBufferReadable());
+			return true;
+		}
+
+		public ByteBuffer complete() {
+			memory.flip();
+			return memory;
+		}
+
+		public int getNumBytes() {
+			return memory.position();
+		}
+	}
+
+	static final class Reader {
+
+		private final ByteBuffer memory;
+
+		Reader(ByteBuffer memory) {
+			this.memory = checkAndConfigureByteBuffer(memory);
+		}
+
+		@Nullable
+		public Buffer sliceNextBuffer() {
+			final ByteBuffer memory = this.memory;
+			final int remaining = memory.remaining();
+
+			// we only check the correct case where data is exhausted
+			// all other cases can only occur if our write logic is wrong and will already throw
+			// buffer underflow exceptions which will cause the read to fail.
+			if (remaining == 0) {
+				return null;
+			}
+
+			final int header = memory.getInt();
+			final int size = memory.getInt();
+
+			memory.limit(memory.position() + size);
+			ByteBuffer buf = memory.slice();
+			memory.position(memory.limit());
+			memory.limit(memory.capacity());
+
+			MemorySegment memorySegment = MemorySegmentFactory.wrapOffHeapMemory(buf);
+			Buffer buffer = new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE);
+			buffer.setSize(size);
+
+			if (header == HEADER_VALUE_IS_EVENT) {
+				buffer.tagAsEvent();
+			}
+
+			return buffer;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
new file mode 100644
index 0000000..6bb031e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This class is largely a workaround for the fact that a memory mapped region in Java can cannot
+ * be larger than 2GB (== signed 32 bit int max value). The class takes {@link Buffer Buffers} and
+ * writes them to several memory mapped region, using the {@link BufferToByteBuffer}
+ * class.
+ *
+ * <h2>Useage</h2>
+ *
+ * <p>The class assumes in the first phase that data is written by repeatedly calling
+ * {@link #writeBuffer(Buffer)}. That puts the data into the memory region of the memory
+ * mapped file. After writing, one must call {@link #finishWrite()}.
+ *
+ * <p>After that, the class can produce multiple {@link BufferSlicer} instances to re-read
+ * the data from the memory regions. Multiple slicers can read concurrently, but each slicer
+ * should be read from by a single thread.
+ *
+ * <p>Eventually, the resources must be disposed via {@link #close()}. After that,
+ * no reading can happen any more.
+ *
+ * <h2>Important!</h2>
+ *
+ * <p>This class performs absolutely no synchronization and relies on single threaded access
+ * or externally synchronized access. Concurrent access around disposal may cause
+ * segmentation faults!
+ *
+ * <p>This class does limited sanity checks and assumes correct use from {@link BoundedBlockingSubpartition}
+ * and {@link BoundedBlockingSubpartitionReader}, such as writing first and rading after.
+ * Not obeying these contracts throws NullPointerExceptions.
+ */
+class MemoryMappedBuffers implements Closeable {
+
+	/** Memory mappings should be at the granularity of page sizes, for efficiency. */
+	private static final int PAGE_SIZE = PageSizeUtil.getSystemPageSizeOrConservativeMultiple();
+
+	/** The encoder to the current memory mapped region we are writing to.
+	 * This value is null once writing has finished or the buffers are disposed. */
+	@Nullable
+	private BufferToByteBuffer.Writer currentBuffer;
+
+	/** All memory mapped regions that are already full (completed). */
+	private final ArrayList<ByteBuffer> fullBuffers;
+
+	/** The file channel backing the memory mapped file. */
+	private final FileChannel file;
+
+	/** The path of the memory mapped file. */
+	private final Path filePath;
+
+	/** The offset where the next mapped region should start. */
+	private long nextMappingOffset;
+
+	/** The size of each mapped region. */
+	private final long mappingSize;
+
+	MemoryMappedBuffers(
+			Path filePath,
+			FileChannel fileChannel,
+			int maxSizePerByteBuffer) throws IOException {
+
+		this.filePath = filePath;
+		this.file = fileChannel;
+		this.mappingSize = alignSize(maxSizePerByteBuffer);
+		this.fullBuffers = new ArrayList<>(4);
+
+		rollOverToNextBuffer();
+	}
+
+	void writeBuffer(Buffer buffer) throws IOException {
+		assert currentBuffer != null;
+
+		if (currentBuffer.writeBuffer(buffer)) {
+			return;
+		}
+
+		rollOverToNextBuffer();
+
+		if (!currentBuffer.writeBuffer(buffer)) {
+			throwTooLargeBuffer(buffer);
+		}
+	}
+
+	BufferSlicer getFullBuffers() {
+		assert currentBuffer == null;
+
+		final List<ByteBuffer> buffers = fullBuffers.stream()
+				.map(ByteBuffer::slice)
+				.collect(Collectors.toList());
+
+		return new BufferSlicer(buffers);
+	}
+
+	/**
+	 * Finishes the current region and prevents further writes.
+	 * After calling this method, further calls to {@link #writeBuffer(Buffer)} will fail.
+	 */
+	void finishWrite() throws IOException {
+		assert currentBuffer != null;
+
+		fullBuffers.add(currentBuffer.complete());
+		currentBuffer = null; // fail further writes fast
+		file.close(); // won't map further regions from now on
+	}
+
+	/**
+	 * Unmaps the file from memory and deletes the file.
+	 * After calling this method, access to any ByteBuffer obtained from this instance
+	 * will cause a segmentation fault.
+	 */
+	public void close() throws IOException {
+		IOUtils.closeQuietly(file); // in case we dispose before finishing writes
+
+		for (ByteBuffer bb : fullBuffers) {
+			PlatformDependent.freeDirectBuffer(bb);
+		}
+		fullBuffers.clear();
+
+		if (currentBuffer != null) {
+			PlatformDependent.freeDirectBuffer(currentBuffer.complete());
+			currentBuffer = null;
+		}
+
+		// To make this compatible with all versions of Windows, we must wait with
+		// deleting the file until it is unmapped.
+		// See also https://stackoverflow.com/questions/11099295/file-flag-delete-on-close-and-memory-mapped-files/51649618#51649618
+
+		Files.delete(filePath);
+	}
+
+	/**
+	 * Gets the number of bytes of all written data (including the metadata in the buffer headers).
+	 */
+	long getSize() {
+		long size = 0L;
+		for (ByteBuffer bb : fullBuffers) {
+			size += bb.remaining();
+		}
+		if (currentBuffer != null) {
+			size += currentBuffer.getNumBytes();
+		}
+		return size;
+	}
+
+	private void rollOverToNextBuffer() throws IOException {
+		if (currentBuffer != null) {
+			// we need to remember the original buffers, not any slices.
+			// slices have no cleaner, which we need to trigger explicit unmapping
+			fullBuffers.add(currentBuffer.complete());
+		}
+
+		final ByteBuffer mapped = file.map(MapMode.READ_WRITE, nextMappingOffset, mappingSize);
+		currentBuffer = new Writer(mapped);
+		nextMappingOffset += mappingSize;
+	}
+
+	private void throwTooLargeBuffer(Buffer buffer) throws IOException {
+		throw new IOException(String.format(
+				"The buffer (%d bytes) is larger than the maximum size of a memory buffer (%d bytes)",
+				buffer.getSize(), mappingSize));
+	}
+
+	/**
+	 * Rounds the size down to the next multiple of the {@link #PAGE_SIZE}.
+	 * We need to round down here to not exceed the original maximum size value.
+	 * Otherwise, values like INT_MAX would round up to overflow the valid maximum
+	 * size of a memory mapping region in Java.
+	 */
+	private static int alignSize(int maxRegionSize) {
+		checkArgument(maxRegionSize >= PAGE_SIZE);
+		return maxRegionSize - (maxRegionSize % PAGE_SIZE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reader
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The "reader" for the memory region. It slices a sequence of buffers from the
+	 * sequence of mapped ByteBuffers.
+	 */
+	static final class BufferSlicer {
+
+		/** The reader/decoder to the memory mapped region with the data we currently read from.
+		 * Max 2GB large. Further regions may be in the {@link #furtherData} field. */
+		private BufferToByteBuffer.Reader data;
+
+		/** Further byte buffers, to handle cases where there is more data than fits into
+		 * one mapped byte buffer (2GB = Integer.MAX_VALUE). */
+		private final Iterator<ByteBuffer> furtherData;
+
+		BufferSlicer(Iterable<ByteBuffer> data) {
+			this.furtherData = data.iterator();
+			this.data = new BufferToByteBuffer.Reader(furtherData.next());
+		}
+
+		@Nullable
+		public Buffer sliceNextBuffer() {
+			// should only be null once empty or disposed, in which case this method
+			// should not be called any more
+			assert data != null;
+
+			final Buffer next = data.sliceNextBuffer();
+			if (next != null) {
+				return next;
+			}
+
+			if (!furtherData.hasNext()) {
+				return null;
+			}
+
+			data = new BufferToByteBuffer.Reader(furtherData.next());
+			return sliceNextBuffer();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new MemoryMappedBuffers, creating a memory mapped file at the given path.
+	 */
+	public static MemoryMappedBuffers create(Path memMappedFilePath) throws IOException {
+		return createWithRegionSize(memMappedFilePath, Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Creates new MemoryMappedBuffers, creating a memory mapped file at the given path.
+	 * Each mapped region (= ByteBuffer) will be of the given size.
+	 */
+	public static MemoryMappedBuffers createWithRegionSize(Path memMappedFilePath, int regionSize) throws IOException {
+		final FileChannel fileChannel = FileChannel.open(memMappedFilePath,
+				StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+
+		return new MemoryMappedBuffers(memMappedFilePath, fileChannel, regionSize);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
new file mode 100644
index 0000000..1ce1a76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+public final class PageSizeUtil {
+
+	/** Value indicating an unknown page size. */
+	public static final int PAGE_SIZE_UNKNOWN = -1;
+
+	/** The default page size on most systems. */
+	public static final int DEFAULT_PAGE_SIZE = 4 * 1024;
+
+	/** A conservative fallback value (64 KiBytes) that should be a multiple of the page size even
+	 * in some uncommon cases of servers installations with larger-than-usual page sizes. */
+	public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024;
+
+	/**
+	 * Tries to get the system page size. If the page size cannot be determined, this
+	 * returns -1.
+	 *
+	 * <p>This internally relies on the presence of "unsafe" and the resolution via some
+	 * Netty utilities.
+	 */
+	public static int getSystemPageSize() {
+		try {
+			return PageSizeUtilInternal.getSystemPageSize();
+		}
+		catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalError(t);
+			return PAGE_SIZE_UNKNOWN;
+		}
+	}
+
+	/**
+	 * Tries to get the system page size. If the page size cannot be determined, this
+	 * returns the {@link #DEFAULT_PAGE_SIZE}.
+	 */
+	public static int getSystemPageSizeOrDefault() {
+		final int pageSize = getSystemPageSize();
+		return pageSize == PAGE_SIZE_UNKNOWN ? DEFAULT_PAGE_SIZE : pageSize;
+	}
+
+	/**
+	 * Tries to get the system page size. If the page size cannot be determined, this
+	 * returns the {@link #CONSERVATIVE_PAGE_SIZE_MULTIPLE}.
+	 */
+	public static int getSystemPageSizeOrConservativeMultiple() {
+		final int pageSize = getSystemPageSize();
+		return pageSize == PAGE_SIZE_UNKNOWN ? CONSERVATIVE_PAGE_SIZE_MULTIPLE : pageSize;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated. */
+	private PageSizeUtil() {}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * All unsafe related code must be in a separate class, so that loading the outer class
+	 * does not implicitly try to resolve the unsafe class.
+	 */
+	@SuppressWarnings("all")
+	private static final class PageSizeUtilInternal {
+
+		static int getSystemPageSize() {
+			Unsafe unsafe = unsafe();
+			return unsafe == null ? PAGE_SIZE_UNKNOWN : unsafe.pageSize();
+		}
+
+		@Nullable
+		private static Unsafe unsafe() {
+			if (PlatformDependent.hasUnsafe()) {
+				return (Unsafe) AccessController.doPrivileged(
+						(PrivilegedAction<Object>) () -> UnsafeAccess.UNSAFE);
+			}
+			else {
+				return null;
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index fe27d97..7394e6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -30,6 +31,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -56,6 +58,13 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	// ------------------------------------------------------------------------
 
+	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
+	private final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
+
+	/** The number of non-event buffers currently in this subpartition. */
+	@GuardedBy("buffers")
+	private int buffersInBacklog;
+
 	/** The read view to consume this subpartition. */
 	private PipelinedSubpartitionView readView;
 
@@ -68,6 +77,12 @@ class PipelinedSubpartition extends ResultSubpartition {
 	/** Flag indicating whether the subpartition has been released. */
 	private volatile boolean isReleased;
 
+	/** The total number of buffers (both data and event buffers). */
+	private long totalNumberOfBuffers;
+
+	/** The total number of bytes (both data and event buffers). */
+	private long totalNumberOfBytes;
+
 	// ------------------------------------------------------------------------
 
 	PipelinedSubpartition(int index, ResultPartition parent) {
@@ -300,6 +315,61 @@ class PipelinedSubpartition extends ResultSubpartition {
 		}
 	}
 
+	@Override
+	protected long getTotalNumberOfBuffers() {
+		return totalNumberOfBuffers;
+	}
+
+	@Override
+	protected long getTotalNumberOfBytes() {
+		return totalNumberOfBytes;
+	}
+
+	Throwable getFailureCause() {
+		return parent.getFailureCause();
+	}
+
+	private void updateStatistics(BufferConsumer buffer) {
+		totalNumberOfBuffers++;
+	}
+
+	private void updateStatistics(Buffer buffer) {
+		totalNumberOfBytes += buffer.getSize();
+	}
+
+	@GuardedBy("buffers")
+	private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
+		assert Thread.holdsLock(buffers);
+		if (isBuffer) {
+			buffersInBacklog--;
+		}
+	}
+
+	/**
+	 * Increases the number of non-event buffers by one after adding a non-event
+	 * buffer into this subpartition.
+	 */
+	@GuardedBy("buffers")
+	private void increaseBuffersInBacklog(BufferConsumer buffer) {
+		assert Thread.holdsLock(buffers);
+
+		if (buffer != null && buffer.isBuffer()) {
+			buffersInBacklog++;
+		}
+	}
+
+	/**
+	 * Gets the number of non-event buffers in this subpartition.
+	 *
+	 * <p><strong>Beware:</strong> This method should only be used in tests in non-concurrent access
+	 * scenarios since it does not make any concurrency guarantees.
+	 */
+	@SuppressWarnings("FieldAccessNotGuarded")
+	@VisibleForTesting
+	public int getBuffersInBacklog() {
+		return buffersInBacklog;
+	}
+
 	private boolean shouldNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
 		return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 24ce27e..1ff1ec5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -150,10 +152,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		// Create the subpartitions.
 		switch (partitionType) {
 			case BLOCKING:
-				for (int i = 0; i < subpartitions.length; i++) {
-					subpartitions[i] = new SpillableSubpartition(i, this, ioManager);
-				}
-
+				initializeBoundedBlockingPartitions(subpartitions, this, ioManager);
 				break;
 
 			case PIPELINED:
@@ -466,4 +465,35 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 			hasNotifiedPipelinedConsumers = true;
 		}
 	}
+
+	private static void initializeBoundedBlockingPartitions(
+			ResultSubpartition[] subpartitions,
+			ResultPartition parent,
+			IOManager ioManager) {
+
+		int i = 0;
+		try {
+			for (; i < subpartitions.length; i++) {
+				subpartitions[i] = new BoundedBlockingSubpartition(
+						i, parent, ioManager.createChannel().getPathFile().toPath());
+			}
+		}
+		catch (IOException e) {
+			// undo all the work so that a failed constructor does not leave any resources
+			// in need of disposal
+			releasePartitionsQuietly(subpartitions, i);
+
+			// this is not good, we should not be forced to wrap this in a runtime exception.
+			// the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions
+			// is incompatible with eager initialization of resources (RAII).
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
+		for (int i = 0; i < until; i++) {
+			final ResultSubpartition subpartition = partitions[i];
+			ExceptionUtils.suppressExceptions(subpartition::release);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 58a1402..920ce8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -22,10 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.io.IOException;
-import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,41 +37,19 @@ public abstract class ResultSubpartition {
 	/** The parent partition this subpartition belongs to. */
 	protected final ResultPartition parent;
 
-	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
-	protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
-
-	/** The number of non-event buffers currently in this subpartition. */
-	@GuardedBy("buffers")
-	private int buffersInBacklog;
-
 	// - Statistics ----------------------------------------------------------
 
-	/** The total number of buffers (both data and event buffers). */
-	private long totalNumberOfBuffers;
-
-	/** The total number of bytes (both data and event buffers). */
-	private long totalNumberOfBytes;
-
 	public ResultSubpartition(int index, ResultPartition parent) {
 		this.index = index;
 		this.parent = parent;
 	}
 
-	protected void updateStatistics(BufferConsumer buffer) {
-		totalNumberOfBuffers++;
-	}
-
-	protected void updateStatistics(Buffer buffer) {
-		totalNumberOfBytes += buffer.getSize();
-	}
-
-	protected long getTotalNumberOfBuffers() {
-		return totalNumberOfBuffers;
-	}
+	/**
+	 * Gets the total numbers of buffers (data buffers plus events).
+	 */
+	protected abstract long getTotalNumberOfBuffers();
 
-	protected long getTotalNumberOfBytes() {
-		return totalNumberOfBytes;
-	}
+	protected abstract long getTotalNumberOfBytes();
 
 	/**
 	 * Notifies the parent partition about a consumed {@link ResultSubpartitionView}.
@@ -83,10 +58,6 @@ public abstract class ResultSubpartition {
 		parent.onConsumedSubpartition(index);
 	}
 
-	protected Throwable getFailureCause() {
-		return parent.getFailureCause();
-	}
-
 	/**
 	 * Adds the given buffer.
 	 *
@@ -123,9 +94,7 @@ public abstract class ResultSubpartition {
 	 * scenarios since it does not make any concurrency guarantees.
 	 */
 	@VisibleForTesting
-	public int getBuffersInBacklog() {
-		return buffersInBacklog;
-	}
+	abstract int getBuffersInBacklog();
 
 	/**
 	 * Makes a best effort to get the current size of the queue.
@@ -134,38 +103,6 @@ public abstract class ResultSubpartition {
 	 */
 	public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
-	/**
-	 * Decreases the number of non-event buffers by one after fetching a non-event
-	 * buffer from this subpartition (for access by the subpartition views).
-	 *
-	 * @return backlog after the operation
-	 */
-	public int decreaseBuffersInBacklog(Buffer buffer) {
-		synchronized (buffers) {
-			return decreaseBuffersInBacklogUnsafe(buffer != null && buffer.isBuffer());
-		}
-	}
-
-	protected int decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
-		assert Thread.holdsLock(buffers);
-		if (isBuffer) {
-			buffersInBacklog--;
-		}
-		return buffersInBacklog;
-	}
-
-	/**
-	 * Increases the number of non-event buffers by one after adding a non-event
-	 * buffer into this subpartition.
-	 */
-	protected void increaseBuffersInBacklog(BufferConsumer buffer) {
-		assert Thread.holdsLock(buffers);
-
-		if (buffer != null && buffer.isBuffer()) {
-			buffersInBacklog++;
-		}
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -201,6 +138,14 @@ public abstract class ResultSubpartition {
 		public boolean nextBufferIsEvent() {
 			return nextBufferIsEvent;
 		}
+
+		public static BufferAndBacklog fromBufferAndLookahead(Buffer current, Buffer lookahead, int backlog) {
+			return new BufferAndBacklog(
+					current,
+					lookahead != null,
+					backlog,
+					lookahead != null && !lookahead.isBuffer());
+		}
 	}
 
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
deleted file mode 100644
index 9f696ad..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A spillable sub partition starts out in-memory and spills to disk if asked
- * to do so.
- *
- * <p>Buffers for the partition come from a {@link BufferPool}. The buffer pool
- * is also responsible to trigger the release of the buffers if it needs them
- * back. At this point, the spillable sub partition will write all in-memory
- * buffers to disk. All added buffers after that point directly go to disk.
- *
- * <p>This partition type is used for {@link ResultPartitionType#BLOCKING}
- * results that are fully produced before they can be consumed. At the point
- * when they are consumed, the buffers are (i) all in-memory, (ii) currently
- * being spilled to disk, or (iii) completely spilled to disk. Depending on
- * this state, different reader variants are returned (see
- * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
- *
- * <p>Since the network buffer pool size for outgoing partitions is usually
- * quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
- * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
- * for bounded channels or from the default values of
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, and
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, most spillable partitions
- * will be spilled for real-world data sets.
- *
- * <p>Note on thread safety. Synchronizing on {@code buffers} is used to synchronize
- * writes and reads. Synchronizing on {@code this} is used against concurrent
- * {@link #add(BufferConsumer)} and clean ups {@link #release()} / {@link #finish()} which
- * also are touching {@code spillWriter}. Since we do not want to block reads during
- * spilling, we need those two synchronization. Probably this model could be simplified.
- */
-class SpillableSubpartition extends ResultSubpartition {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class);
-
-	/** The I/O manager used for spilling buffers to disk. */
-	private final IOManager ioManager;
-
-	/** The writer used for spilling. As long as this is null, we are in-memory. */
-	private BufferFileWriter spillWriter;
-
-	/** Flag indicating whether the subpartition has been finished. */
-	private boolean isFinished;
-
-	/** Flag indicating whether the subpartition has been released. */
-	private volatile boolean isReleased;
-
-	/** The read view to consume this subpartition. */
-	private ResultSubpartitionView readView;
-
-	SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager) {
-		super(index, parent);
-
-		this.ioManager = checkNotNull(ioManager);
-	}
-
-	@Override
-	public synchronized boolean add(BufferConsumer bufferConsumer) throws IOException {
-		return add(bufferConsumer, false);
-	}
-
-	private boolean add(BufferConsumer bufferConsumer, boolean forceFinishRemainingBuffers)
-			throws IOException {
-		checkNotNull(bufferConsumer);
-
-		synchronized (buffers) {
-			if (isFinished || isReleased) {
-				bufferConsumer.close();
-				return false;
-			}
-
-			buffers.add(bufferConsumer);
-			// The number of buffers are needed later when creating
-			// the read views. If you ever remove this line here,
-			// make sure to still count the number of buffers.
-			updateStatistics(bufferConsumer);
-			increaseBuffersInBacklog(bufferConsumer);
-
-			if (spillWriter != null) {
-				spillFinishedBufferConsumers(forceFinishRemainingBuffers);
-			}
-		}
-		return true;
-	}
-
-	@Override
-	public void flush() {
-		synchronized (buffers) {
-			if (readView != null) {
-				readView.notifyDataAvailable();
-			}
-		}
-	}
-
-	@Override
-	public synchronized void finish() throws IOException {
-		synchronized (buffers) {
-			if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true)) {
-				isFinished = true;
-			}
-
-			flush();
-		}
-
-		// If we are spilling/have spilled, wait for the writer to finish
-		if (spillWriter != null) {
-			spillWriter.close();
-		}
-		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
-	}
-
-	@Override
-	public synchronized void release() throws IOException {
-		// view reference accessible outside the lock, but assigned inside the locked scope
-		final ResultSubpartitionView view;
-
-		synchronized (buffers) {
-			if (isReleased) {
-				return;
-			}
-
-			// Release all available buffers
-			for (BufferConsumer buffer : buffers) {
-				buffer.close();
-			}
-			buffers.clear();
-
-			view = readView;
-
-			// No consumer yet, we are responsible to clean everything up. If
-			// one is available, the view is responsible is to clean up (see
-			// below).
-			if (view == null) {
-
-				// TODO This can block until all buffers are written out to
-				// disk if a spill is in-progress before deleting the file.
-				// It is possibly called from the Netty event loop threads,
-				// which can bring down the network.
-				if (spillWriter != null) {
-					spillWriter.closeAndDelete();
-				}
-			}
-
-			isReleased = true;
-		}
-
-		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
-
-		if (view != null) {
-			view.releaseAllResources();
-		}
-	}
-
-	@Override
-	public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
-		synchronized (buffers) {
-			if (!isFinished) {
-				throw new IllegalStateException("Subpartition has not been finished yet, " +
-					"but blocking subpartitions can only be consumed after they have " +
-					"been finished.");
-			}
-
-			if (readView != null) {
-				throw new IllegalStateException("Subpartition is being or already has been " +
-					"consumed, but we currently allow subpartitions to only be consumed once.");
-			}
-
-			if (spillWriter != null) {
-				readView = new SpilledSubpartitionView(
-					this,
-					parent.getBufferProvider().getMemorySegmentSize(),
-					spillWriter,
-					getTotalNumberOfBuffers(),
-					availabilityListener);
-			} else {
-				readView = new SpillableSubpartitionView(
-					this,
-					buffers,
-					ioManager,
-					parent.getBufferProvider().getMemorySegmentSize(),
-					availabilityListener);
-			}
-			return readView;
-		}
-	}
-
-	@Override
-	public int releaseMemory() throws IOException {
-		synchronized (buffers) {
-			ResultSubpartitionView view = readView;
-
-			if (view != null && view.getClass() == SpillableSubpartitionView.class) {
-				// If there is a spillable view, it's the responsibility of the
-				// view to release memory.
-				SpillableSubpartitionView spillableView = (SpillableSubpartitionView) view;
-				return spillableView.releaseMemory();
-			} else if (spillWriter == null) {
-				// No view and in-memory => spill to disk
-				spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
-
-				int numberOfBuffers = buffers.size();
-				long spilledBytes = spillFinishedBufferConsumers(isFinished);
-				int spilledBuffers = numberOfBuffers - buffers.size();
-
-				LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.",
-					parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId());
-
-				return spilledBuffers;
-			}
-		}
-
-		// Else: We have already spilled and don't hold any buffers
-		return 0;
-	}
-
-	@VisibleForTesting
-	long spillFinishedBufferConsumers(boolean forceFinishRemainingBuffers) throws IOException {
-		long spilledBytes = 0;
-
-		while (!buffers.isEmpty()) {
-			BufferConsumer bufferConsumer = buffers.getFirst();
-			Buffer buffer = bufferConsumer.build();
-			updateStatistics(buffer);
-			int bufferSize = buffer.getSize();
-			spilledBytes += bufferSize;
-
-			// NOTE we may be in the process of finishing the subpartition where any buffer should
-			// be treated as if it was finished!
-			if (bufferConsumer.isFinished() || forceFinishRemainingBuffers) {
-				if (bufferSize > 0) {
-					spillWriter.writeBlock(buffer);
-				} else {
-					// If we skip a buffer for the spill writer, we need to adapt the backlog accordingly
-					decreaseBuffersInBacklog(buffer);
-					buffer.recycleBuffer();
-				}
-				bufferConsumer.close();
-				buffers.poll();
-			} else {
-				// If there is already data, we need to spill it anyway, since we do not get this
-				// slice from the buffer consumer again during the next build.
-				// BEWARE: by doing so, we increase the actual number of buffers in the spill writer!
-				if (bufferSize > 0) {
-					spillWriter.writeBlock(buffer);
-					increaseBuffersInBacklog(bufferConsumer);
-				} else {
-					buffer.recycleBuffer();
-				}
-
-				return spilledBytes;
-			}
-		}
-		return spilledBytes;
-	}
-
-	@Override
-	public boolean isReleased() {
-		return isReleased;
-	}
-
-	@Override
-	public int unsynchronizedGetNumberOfQueuedBuffers() {
-		// since we do not synchronize, the size may actually be lower than 0!
-		return Math.max(buffers.size(), 0);
-	}
-
-	@Override
-	public String toString() {
-		return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," +
-				"%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]",
-			index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
-			getBuffersInBacklog(), isFinished, readView != null, spillWriter != null);
-	}
-
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
deleted file mode 100644
index 65790d7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-class SpillableSubpartitionView implements ResultSubpartitionView {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class);
-
-	/** The subpartition this view belongs to. */
-	private final SpillableSubpartition parent;
-
-	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
-	private final ArrayDeque<BufferConsumer> buffers;
-
-	/** IO manager if we need to spill (for spilled case). */
-	private final IOManager ioManager;
-
-	/** Size of memory segments (for spilled case). */
-	private final int memorySegmentSize;
-
-	/**
-	 * The buffer availability listener. As long as in-memory, notifications
-	 * happen on a buffer per buffer basis as spilling may happen after a
-	 * notification has been sent out.
-	 */
-	private final BufferAvailabilityListener listener;
-
-	private final AtomicBoolean isReleased = new AtomicBoolean(false);
-
-	/** Remember the number of buffers this view was created with. */
-	private final long numBuffers;
-
-	/**
-	 * The next buffer to hand out. Everytime this is set to a non-null value,
-	 * a listener notification happens.
-	 */
-	private BufferConsumer nextBuffer;
-
-	private volatile SpilledSubpartitionView spilledView;
-
-	SpillableSubpartitionView(
-		SpillableSubpartition parent,
-		ArrayDeque<BufferConsumer> buffers,
-		IOManager ioManager,
-		int memorySegmentSize,
-		BufferAvailabilityListener listener) {
-
-		this.parent = checkNotNull(parent);
-		this.buffers = checkNotNull(buffers);
-		this.ioManager = checkNotNull(ioManager);
-		this.memorySegmentSize = memorySegmentSize;
-		this.listener = checkNotNull(listener);
-
-		synchronized (buffers) {
-			numBuffers = buffers.size();
-			nextBuffer = buffers.poll();
-		}
-
-		if (nextBuffer != null) {
-			listener.notifyDataAvailable();
-		}
-	}
-
-	int releaseMemory() throws IOException {
-		synchronized (buffers) {
-			if (spilledView != null || nextBuffer == null) {
-				// Already spilled or nothing in-memory
-				return 0;
-			} else {
-				// We don't touch next buffer, because a notification has
-				// already been sent for it. Only when it is consumed, will
-				// it be recycled.
-
-				// Create the spill writer and write all buffers to disk
-				BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
-
-				long spilledBytes = 0;
-
-				int numBuffers = buffers.size();
-				for (int i = 0; i < numBuffers; i++) {
-					try (BufferConsumer bufferConsumer = buffers.remove()) {
-						Buffer buffer = bufferConsumer.build();
-						checkState(bufferConsumer.isFinished(), "BufferConsumer must be finished before " +
-							"spilling. Otherwise we would not be able to simply remove it from the queue. This should " +
-							"be guaranteed by creating ResultSubpartitionView only once Subpartition isFinished.");
-						parent.updateStatistics(buffer);
-						spilledBytes += buffer.getSize();
-						spillWriter.writeBlock(buffer);
-					}
-				}
-
-				spilledView = new SpilledSubpartitionView(
-					parent,
-					memorySegmentSize,
-					spillWriter,
-					numBuffers,
-					listener);
-
-				LOG.debug("Spilling {} bytes for sub partition {} of {}.",
-					spilledBytes,
-					parent.index,
-					parent.parent.getPartitionId());
-
-				return numBuffers;
-			}
-		}
-	}
-
-	@Nullable
-	@Override
-	public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
-		Buffer current = null;
-		boolean nextBufferIsEvent = false;
-		int newBacklog = 0; // this is always correct if current is non-null!
-		boolean isMoreAvailable = false;
-
-		synchronized (buffers) {
-			if (isReleased.get()) {
-				return null;
-			} else if (nextBuffer != null) {
-				current = nextBuffer.build();
-				checkState(nextBuffer.isFinished(),
-					"We can only read from SpillableSubpartition after it was finished");
-
-				newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
-				nextBuffer.close();
-				nextBuffer = buffers.poll();
-
-				if (nextBuffer != null) {
-					nextBufferIsEvent = !nextBuffer.isBuffer();
-					isMoreAvailable = true;
-				}
-
-				parent.updateStatistics(current);
-				// if we are spilled (but still process a non-spilled nextBuffer), we don't know the
-				// state of nextBufferIsEvent or whether more buffers are available
-				if (spilledView == null) {
-					return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent);
-				}
-			}
-		} // else: spilled
-
-		SpilledSubpartitionView spilled = spilledView;
-		if (spilled != null) {
-			if (current != null) {
-				return new BufferAndBacklog(current, spilled.isAvailable(), newBacklog, spilled.nextBufferIsEvent());
-			} else {
-				return spilled.getNextBuffer();
-			}
-		} else {
-			throw new IllegalStateException("No in-memory buffers available, but also nothing spilled.");
-		}
-	}
-
-	@Override
-	public void notifyDataAvailable() {
-		// We do the availability listener notification one by one
-	}
-
-	@Override
-	public void releaseAllResources() throws IOException {
-		if (isReleased.compareAndSet(false, true)) {
-			SpilledSubpartitionView spilled = spilledView;
-			if (spilled != null) {
-				spilled.releaseAllResources();
-			}
-			// we are never giving this buffer out in getNextBuffer(), so we need to clean it up
-			synchronized (buffers) {
-				if (nextBuffer != null) {
-					nextBuffer.close();
-					nextBuffer = null;
-				}
-			}
-		}
-	}
-
-	@Override
-	public void notifySubpartitionConsumed() throws IOException {
-		SpilledSubpartitionView spilled = spilledView;
-		if (spilled != null) {
-			spilled.notifySubpartitionConsumed();
-		} else {
-			parent.onConsumedSubpartition();
-		}
-	}
-
-	@Override
-	public boolean isReleased() {
-		SpilledSubpartitionView spilled = spilledView;
-		if (spilled != null) {
-			return spilled.isReleased();
-		} else {
-			return parent.isReleased() || isReleased.get();
-		}
-	}
-
-	@Override
-	public boolean nextBufferIsEvent() {
-		synchronized (buffers) {
-			if (isReleased.get()) {
-				return false;
-			} else if (nextBuffer != null) {
-				return !nextBuffer.isBuffer();
-			}
-		} // else: spilled
-
-		checkState(spilledView != null, "No in-memory buffers available, but also nothing spilled.");
-
-		return spilledView.nextBufferIsEvent();
-	}
-
-	@Override
-	public boolean isAvailable() {
-		synchronized (buffers) {
-			if (nextBuffer != null) {
-				return true;
-			}
-			else if (spilledView == null) {
-				return false;
-			}
-		} // else: spilled
-
-		return spilledView.isAvailable();
-	}
-
-	@Override
-	public Throwable getFailureCause() {
-		SpilledSubpartitionView spilled = spilledView;
-		if (spilled != null) {
-			return spilled.getFailureCause();
-		} else {
-			return parent.getFailureCause();
-		}
-	}
-
-	@Override
-	public String toString() {
-		boolean hasSpilled = spilledView != null;
-
-		return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? %b) of ResultPartition %s",
-			parent.index,
-			numBuffers,
-			hasSpilled,
-			parent.parent.getPartitionId());
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
deleted file mode 100644
index f941e20..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Reader for a spilled sub partition.
- *
- * <p>The partition availability listener is notified about available buffers
- * only when the spilling is done. Spilling is done async and if it is still
- * in progress, we wait with the notification until the spilling is done.
- *
- * <p>Reads of the spilled file are done in synchronously.
- */
-class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class);
-
-	/** The subpartition this view belongs to. */
-	private final SpillableSubpartition parent;
-
-	/** Writer for spills. */
-	private final BufferFileWriter spillWriter;
-
-	/** The synchronous file reader to do the actual I/O. */
-	@GuardedBy("this")
-	private final BufferFileReader fileReader;
-
-	/** The buffer pool to read data into. */
-	private final SpillReadBufferPool bufferPool;
-
-	/** Buffer availability listener. */
-	private final BufferAvailabilityListener availabilityListener;
-
-	/** The total number of spilled buffers. */
-	private final long numberOfSpilledBuffers;
-
-	/** Flag indicating whether all resources have been released. */
-	private AtomicBoolean isReleased = new AtomicBoolean();
-
-	/** The next buffer to hand out. */
-	@GuardedBy("this")
-	private Buffer nextBuffer;
-
-	/** Flag indicating whether a spill is still in progress. */
-	private volatile boolean isSpillInProgress = true;
-
-	SpilledSubpartitionView(
-		SpillableSubpartition parent,
-		int memorySegmentSize,
-		BufferFileWriter spillWriter,
-		long numberOfSpilledBuffers,
-		BufferAvailabilityListener availabilityListener) throws IOException {
-
-		this.parent = checkNotNull(parent);
-		this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
-		this.spillWriter = checkNotNull(spillWriter);
-		this.fileReader = new SynchronousBufferFileReader(spillWriter.getChannelID(), false);
-		checkArgument(numberOfSpilledBuffers >= 0);
-		this.numberOfSpilledBuffers = numberOfSpilledBuffers;
-		this.availabilityListener = checkNotNull(availabilityListener);
-
-		// Check whether async spilling is still in progress. If not, this returns
-		// false and we can notify our availability listener about all available buffers.
-		// Otherwise, we notify only when the spill writer callback happens.
-		if (!spillWriter.registerAllRequestsProcessedListener(this)) {
-			isSpillInProgress = false;
-			availabilityListener.notifyDataAvailable();
-			LOG.debug("No spilling in progress. Notified about {} available buffers.", numberOfSpilledBuffers);
-		} else {
-			LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", numberOfSpilledBuffers);
-		}
-	}
-
-	/**
-	 * This is the call back method for the spill writer. If a spill is still
-	 * in progress when this view is created we wait until this method is called
-	 * before we notify the availability listener.
-	 */
-	@Override
-	public void onNotification() {
-		isSpillInProgress = false;
-		availabilityListener.notifyDataAvailable();
-		LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers);
-	}
-
-	@Nullable
-	@Override
-	public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
-		if (isSpillInProgress) {
-			return null;
-		}
-
-		Buffer current;
-		boolean nextBufferIsEvent;
-		synchronized (this) {
-			if (nextBuffer == null) {
-				current = requestAndFillBuffer();
-			} else {
-				current = nextBuffer;
-			}
-			nextBuffer = requestAndFillBuffer();
-			nextBufferIsEvent = nextBuffer != null && !nextBuffer.isBuffer();
-		}
-
-		if (current == null) {
-			return null;
-		}
-
-		int newBacklog = parent.decreaseBuffersInBacklog(current);
-		return new BufferAndBacklog(current, newBacklog > 0 || nextBufferIsEvent, newBacklog, nextBufferIsEvent);
-	}
-
-	@Nullable
-	private Buffer requestAndFillBuffer() throws IOException, InterruptedException {
-		assert Thread.holdsLock(this);
-
-		if (fileReader.hasReachedEndOfFile()) {
-			return null;
-		}
-		// TODO This is fragile as we implicitly expect that multiple calls to
-		// this method don't happen before recycling buffers returned earlier.
-		Buffer buffer = bufferPool.requestBufferBlocking();
-		fileReader.readInto(buffer);
-		return buffer;
-	}
-
-	@Override
-	public void notifyDataAvailable() {
-		// We do the availability listener notification either directly on
-		// construction of this view (when everything has been spilled) or
-		// as soon as spilling is done and we are notified about it in the
-		// #onNotification callback.
-	}
-
-	@Override
-	public void notifySubpartitionConsumed() throws IOException {
-		parent.onConsumedSubpartition();
-	}
-
-	@Override
-	public void releaseAllResources() throws IOException {
-		if (isReleased.compareAndSet(false, true)) {
-			// TODO This can block until all buffers are written out to
-			// disk if a spill is in-progress before deleting the file.
-			// It is possibly called from the Netty event loop threads,
-			// which can bring down the network.
-			spillWriter.closeAndDelete();
-
-			synchronized (this) {
-				fileReader.close();
-				if (nextBuffer != null) {
-					nextBuffer.recycleBuffer();
-					nextBuffer = null;
-				}
-			}
-
-			bufferPool.destroy();
-		}
-	}
-
-	@Override
-	public boolean isReleased() {
-		return parent.isReleased() || isReleased.get();
-	}
-
-	@Override
-	public boolean nextBufferIsEvent() {
-		synchronized (this) {
-			if (nextBuffer == null) {
-				try {
-					nextBuffer = requestAndFillBuffer();
-				} catch (Exception e) {
-					// we can ignore this here (we will get it again once getNextBuffer() is called)
-					return false;
-				}
-			}
-			return nextBuffer != null && !nextBuffer.isBuffer();
-		}
-	}
-
-	@Override
-	public synchronized boolean isAvailable() {
-		if (nextBuffer != null) {
-			return true;
-		}
-		return !fileReader.hasReachedEndOfFile();
-	}
-
-	@Override
-	public Throwable getFailureCause() {
-		return parent.getFailureCause();
-	}
-
-	@Override
-	public String toString() {
-		return String.format("SpilledSubpartitionView(index: %d, buffers: %d) of ResultPartition %s",
-			parent.index,
-			numberOfSpilledBuffers,
-			parent.parent.getPartitionId());
-	}
-
-	/**
-	 * A buffer pool to provide buffer to read the file into.
-	 *
-	 * <p>This pool ensures that a consuming input gate makes progress in all cases, even when all
-	 * buffers of the input gate buffer pool have been requested by remote input channels.
-	 */
-	private static class SpillReadBufferPool implements BufferRecycler {
-
-		private final Queue<Buffer> buffers;
-
-		private boolean isDestroyed;
-
-		SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
-			this.buffers = new ArrayDeque<>(numberOfBuffers);
-
-			synchronized (buffers) {
-				for (int i = 0; i < numberOfBuffers; i++) {
-					buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory(
-						memorySegmentSize, null), this));
-				}
-			}
-		}
-
-		@Override
-		public void recycle(MemorySegment memorySegment) {
-			synchronized (buffers) {
-				if (isDestroyed) {
-					memorySegment.free();
-				} else {
-					buffers.add(new NetworkBuffer(memorySegment, this));
-					buffers.notifyAll();
-				}
-			}
-		}
-
-		private Buffer requestBufferBlocking() throws InterruptedException {
-			synchronized (buffers) {
-				while (true) {
-					if (isDestroyed) {
-						return null;
-					}
-
-					Buffer buffer = buffers.poll();
-
-					if (buffer != null) {
-						return buffer;
-					}
-					// Else: wait for a buffer
-					buffers.wait();
-				}
-			}
-		}
-
-		private void destroy() {
-			synchronized (buffers) {
-				isDestroyed = true;
-				buffers.notifyAll();
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index 9706a86..7a68368 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Utility class for create not-pooled {@link BufferBuilder}.
@@ -83,4 +86,30 @@ public class BufferBuilderTestUtils {
 			FreeingBufferRecycler.INSTANCE,
 			false);
 	}
+
+	public static Buffer buildBufferWithAscendingInts(int bufferSize, int numInts, int nextValue) {
+		final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+		for (int i = 0; i < numInts; i++) {
+			seg.putIntLittleEndian(4 * i, nextValue++);
+		}
+
+		return new NetworkBuffer(seg, MemorySegment::free, true, 4 * numInts);
+	}
+
+	public static void validateBufferWithAscendingInts(Buffer buffer, int numInts, int nextValue) {
+		final ByteBuffer bb = buffer.getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
+
+		for (int i = 0; i < numInts; i++) {
+			assertEquals(nextValue++, bb.getInt());
+		}
+	}
+
+	public static Buffer buildSomeBuffer() {
+		return buildSomeBuffer(1024);
+	}
+
+	public static Buffer buildSomeBuffer(int size) {
+		final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(size);
+		return new NetworkBuffer(seg, MemorySegment::free, true, size);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
new file mode 100644
index 0000000..5051b90
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Behavior tests for the {@link BoundedBlockingSubpartition} and the
+ * {@link BoundedBlockingSubpartitionReader}.
+ *
+ * <p>Full read / write tests for the partition and the reader are in
+ * {@link BoundedBlockingSubpartitionWriteReadTest}.
+ */
+public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCreateReaderBeforeFinished() throws Exception {
+		final ResultSubpartition partition = createSubpartition();
+
+		try {
+			partition.createReadView(new NoOpBufferAvailablityListener());
+			fail("expected exception");
+		}
+		catch (IllegalStateException ignored) {}
+
+		partition.release();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	ResultSubpartition createSubpartition() throws Exception {
+		final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+		return new BoundedBlockingSubpartition(0, resultPartition, tmpPath());
+	}
+
+	@Override
+	ResultSubpartition createFailingWritesSubpartition() throws Exception {
+		final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+
+		return new BoundedBlockingSubpartition(
+				0,
+				resultPartition,
+				FailingMemory.create());
+	}
+
+	// ------------------------------------------------------------------------
+
+	static Path tmpPath() throws IOException {
+		return new File(TMP_DIR.newFolder(), "subpartition").toPath();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class FailingMemory extends MemoryMappedBuffers {
+
+		FailingMemory(Path path, FileChannel fc) throws IOException {
+			super(path, fc, Integer.MAX_VALUE);
+		}
+
+		@Override
+		void writeBuffer(Buffer buffer) throws IOException {
+			throw new IOException("test");
+		}
+
+		static FailingMemory create() throws IOException {
+			Path p = tmpPath();
+			FileChannel fc = FileChannel.open(p, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
+			return new FailingMemory(p, fc);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
new file mode 100644
index 0000000..46dbb05
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel.
+ */
+public class BoundedBlockingSubpartitionWriteReadTest {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testWriteAndReadData() throws Exception {
+		final int numLongs = 15_000_000; // roughly 115 MiBytes
+
+		// setup
+		final BoundedBlockingSubpartition subpartition = createAndFillPartition(numLongs);
+
+		// test & check
+		final ResultSubpartitionView reader = subpartition.createReadView(() -> {});
+		readLongs(reader, numLongs, subpartition.getBuffersInBacklog());
+
+		// cleanup
+		reader.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test
+	public void testRead10ConsumersSequential() throws Exception {
+		final int numLongs = 10_000_000;
+
+		// setup
+		final BoundedBlockingSubpartition subpartition = createAndFillPartition(numLongs);
+
+		// test & check
+		for (int i = 0; i < 10; i++) {
+			final ResultSubpartitionView reader = subpartition.createReadView(() -> {});
+			readLongs(reader, numLongs, subpartition.getBuffersInBacklog());
+			reader.releaseAllResources();
+		}
+
+		// cleanup
+		subpartition.release();
+	}
+
+	@Test
+	public void testRead10ConsumersConcurrent() throws Exception {
+		final int numLongs = 15_000_000;
+
+		// setup
+		final BoundedBlockingSubpartition subpartition = createAndFillPartition(numLongs);
+
+		// test
+		final LongReader[] readerThreads = createSubpartitionLongReaders(
+				subpartition, 10, numLongs, subpartition.getBuffersInBacklog());
+		for (CheckedThread t : readerThreads) {
+			t.start();
+		}
+
+		// check
+		for (CheckedThread t : readerThreads) {
+			t.sync(); // this propagates assertion errors out from the threads
+		}
+
+		// cleanup
+		subpartition.release();
+	}
+
+	// ------------------------------------------------------------------------
+	//  common test passes
+	// ------------------------------------------------------------------------
+
+	private static void readLongs(ResultSubpartitionView reader, long numLongs, int numBuffers) throws Exception {
+		BufferAndBacklog next;
+		long expectedNextLong = 0L;
+		int nextExpectedBacklog = numBuffers - 1;
+
+		while ((next = reader.getNextBuffer()) != null && next.buffer().isBuffer()) {
+			assertTrue(next.isMoreAvailable());
+			assertEquals(nextExpectedBacklog, next.buffersInBacklog());
+
+			ByteBuffer buffer = next.buffer().getNioBufferReadable();
+			while (buffer.hasRemaining()) {
+				assertEquals(expectedNextLong++, buffer.getLong());
+			}
+
+			nextExpectedBacklog--;
+		}
+
+		assertEquals(numLongs, expectedNextLong);
+		assertEquals(-1, nextExpectedBacklog);
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	private static void writeLongs(BoundedBlockingSubpartition partition, long nums) throws IOException {
+		final MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(1024 * 1024);
+
+		long l = 0;
+		while (nums > 0) {
+			int pos = 0;
+			for (; nums > 0 && pos <= memory.size() - 8; pos += 8) {
+				memory.putLongBigEndian(pos, l++);
+				nums--;
+			}
+
+			partition.add(new BufferConsumer(memory, (ignored) -> {}, pos, true));
+
+			// we need to flush after every buffer as long as the add() contract is that
+			// buffer are immediately added and can be filled further after that (for low latency
+			// streaming data exchanges)
+			partition.flush();
+		}
+	}
+
+	private static BoundedBlockingSubpartition createAndFillPartition(long numLongs) throws IOException {
+		BoundedBlockingSubpartition subpartition = createSubpartition();
+		writeLongs(subpartition, numLongs);
+		subpartition.finish();
+		return subpartition;
+	}
+
+	private static BoundedBlockingSubpartition createSubpartition() throws IOException {
+		return new BoundedBlockingSubpartition(
+				0,
+				PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING),
+				new File(TMP_FOLDER.newFolder(), "partitiondata").toPath());
+	}
+
+	private static LongReader[] createSubpartitionLongReaders(
+			BoundedBlockingSubpartition subpartition,
+			int numReaders,
+			int numLongs,
+			int numBuffers) throws IOException {
+
+		final LongReader[] readerThreads = new LongReader[numReaders];
+		for (int i = 0; i < numReaders; i++) {
+			ResultSubpartitionView reader = subpartition.createReadView(() -> {});
+			readerThreads[i] = new LongReader(reader, numLongs, numBuffers);
+		}
+		return readerThreads;
+	}
+
+	private static final class LongReader extends CheckedThread {
+
+		private final ResultSubpartitionView reader;
+
+		private final long numLongs;
+
+		private final int numBuffers;
+
+		LongReader(ResultSubpartitionView reader, long numLongs, int numBuffers) {
+			this.reader = reader;
+			this.numLongs = numLongs;
+			this.numBuffers = numBuffers;
+		}
+
+		@Override
+		public void go() throws Exception {
+			readLongs(reader, numLongs, numBuffers);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
new file mode 100644
index 0000000..55fa496
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Reader;
+import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for the {@link BufferToByteBuffer}.
+ */
+public class BufferToByteBufferTest {
+
+	@Test
+	public void testCompleteIsSameBufferAsOriginal() {
+		final ByteBuffer bb = ByteBuffer.allocateDirect(128);
+		final BufferToByteBuffer.Writer writer = new BufferToByteBuffer.Writer(bb);
+
+		final ByteBuffer result = writer.complete();
+
+		assertSame(bb, result);
+	}
+
+	@Test
+	public void testWriteReadMatchesCapacity() {
+		final ByteBuffer bb = ByteBuffer.allocateDirect(1200);
+		testWriteAndReadMultipleBuffers(bb, 100);
+	}
+
+	@Test
+	public void testWriteReadWithLeftoverCapacity() {
+		final ByteBuffer bb = ByteBuffer.allocateDirect(1177);
+		testWriteAndReadMultipleBuffers(bb, 100);
+	}
+
+	private void testWriteAndReadMultipleBuffers(ByteBuffer buffer, int numIntsPerBuffer) {
+		final Writer writer = new Writer(buffer);
+
+		int numBuffers = 0;
+		while (writer.writeBuffer(BufferBuilderTestUtils.buildBufferWithAscendingInts(1024, numIntsPerBuffer, 0))) {
+			numBuffers++;
+		}
+
+		final ByteBuffer bb = writer.complete().slice();
+
+		final Reader reader = new Reader(bb);
+		Buffer buf;
+		while ((buf = reader.sliceNextBuffer()) != null) {
+			BufferBuilderTestUtils.validateBufferWithAscendingInts(buf, numIntsPerBuffer, 0);
+			numBuffers--;
+		}
+
+		assertEquals(0, numBuffers);
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
new file mode 100644
index 0000000..eec7dba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer;
+
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel.
+ */
+public class MemoryMappedBuffersTest {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testWriteAndReadData() throws Exception {
+		testWriteAndReadData(10_000_000, Integer.MAX_VALUE);
+	}
+
+	@Test
+	public void testWriteAndReadDataAcrossRegions() throws Exception {
+		testWriteAndReadData(10_000_000, 1_276_347);
+	}
+
+	private static void testWriteAndReadData(int numInts, int regionSize) throws Exception {
+		try (MemoryMappedBuffers memory = MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) {
+			final int numBuffers = writeInts(memory, numInts);
+			memory.finishWrite();
+
+			readInts(memory.getFullBuffers(), numBuffers, numInts);
+		}
+	}
+
+	@Test
+	public void returnNullAfterEmpty() throws Exception {
+		try (MemoryMappedBuffers memory = MemoryMappedBuffers.create(createTempPath())) {
+			memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer());
+			memory.finishWrite();
+
+			final BufferSlicer reader = memory.getFullBuffers();
+			assertNotNull(reader.sliceNextBuffer());
+
+			// check that multiple calls now return empty buffers
+			assertNull(reader.sliceNextBuffer());
+			assertNull(reader.sliceNextBuffer());
+			assertNull(reader.sliceNextBuffer());
+		}
+	}
+
+	@Test
+	public void testDeleteFileOnClose() throws Exception {
+		final Path path = createTempPath();
+		final MemoryMappedBuffers mmb = MemoryMappedBuffers.create(path);
+		assertTrue(Files.exists(path));
+
+		mmb.close();
+
+		assertFalse(Files.exists(path));
+	}
+
+	@Test
+	public void testGetSizeSingleRegion() throws Exception {
+		testGetSize(Integer.MAX_VALUE);
+	}
+
+	@Test
+	public void testGetSizeMultipleRegions() throws Exception {
+		testGetSize(100_000);
+	}
+
+	private static void testGetSize(int regionSize) throws Exception {
+		final int bufferSize1 = 60_787;
+		final int bufferSize2 = 76_687;
+		final int expectedSize1 = bufferSize1 + BufferToByteBuffer.HEADER_LENGTH;
+		final int expectedSizeFinal = bufferSize1 + bufferSize2 + 2 * BufferToByteBuffer.HEADER_LENGTH;
+
+		try (MemoryMappedBuffers memory = MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) {
+
+			memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1));
+			assertEquals(expectedSize1, memory.getSize());
+
+			memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2));
+			assertEquals(expectedSizeFinal, memory.getSize());
+
+			memory.finishWrite();
+			assertEquals(expectedSizeFinal, memory.getSize());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	private static int writeInts(MemoryMappedBuffers memory, int numInts) throws IOException {
+		final int bufferSize = 1024 * 1024; // 1 MiByte
+		final int numIntsInBuffer = bufferSize / 4;
+		int numBuffers = 0;
+
+		for (int nextValue = 0; nextValue < numInts; nextValue += numIntsInBuffer) {
+			Buffer buffer = BufferBuilderTestUtils.buildBufferWithAscendingInts(bufferSize, numIntsInBuffer, nextValue);
+			memory.writeBuffer(buffer);
+			numBuffers++;
+		}
+
+		return numBuffers;
+	}
+
+	private static void readInts(MemoryMappedBuffers.BufferSlicer memory, int numBuffersExpected, int numInts) throws IOException {
+		Buffer b;
+		int nextValue = 0;
+		int numBuffers = 0;
+
+		while ((b = memory.sliceNextBuffer()) != null) {
+			final int numIntsInBuffer = b.getSize() / 4;
+			BufferBuilderTestUtils.validateBufferWithAscendingInts(b, numIntsInBuffer, nextValue);
+			nextValue += numIntsInBuffer;
+			numBuffers++;
+		}
+
+		assertEquals(numBuffersExpected, numBuffers);
+		assertThat(nextValue, Matchers.greaterThanOrEqualTo(numInts));
+	}
+
+	private static Path createTempPath() throws IOException {
+		return new File(TMP_FOLDER.newFolder(), "subpartitiondata").toPath();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 36cd353..ff15b42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
@@ -35,6 +36,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -78,6 +80,13 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		return new PipelinedSubpartition(0, parent);
 	}
 
+	@Override
+	ResultSubpartition createFailingWritesSubpartition() throws Exception {
+		// the tests relating to this are currently not supported by the PipelinedSubpartition
+		Assume.assumeTrue(false);
+		return null;
+	}
+
 	@Test
 	public void testIllegalReadViewRequest() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
@@ -280,4 +289,40 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(2, partition.getTotalNumberOfBuffers());
 		assertEquals(0, partition.getTotalNumberOfBytes()); // buffer data is never consumed
 	}
+
+	@Test
+	public void testReleaseParent() throws Exception {
+		final ResultSubpartition partition = createSubpartition();
+		verifyViewReleasedAfterParentRelease(partition);
+	}
+
+	@Test
+	public void testReleaseParentAfterSpilled() throws Exception {
+		final ResultSubpartition partition = createSubpartition();
+		partition.releaseMemory();
+
+		verifyViewReleasedAfterParentRelease(partition);
+	}
+
+	private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
+		// Add a bufferConsumer
+		BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
+		partition.add(bufferConsumer);
+		partition.finish();
+
+		// Create the view
+		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+		ResultSubpartitionView view = partition.createReadView(listener);
+
+		// The added bufferConsumer and end-of-partition event
+		assertNotNull(view.getNextBuffer());
+		assertNotNull(view.getNextBuffer());
+
+		// Release the parent
+		assertFalse(view.isReleased());
+		partition.release();
+
+		// Verify that parent release is reflected at partition view
+		assertTrue(view.isReleased());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index c911df7..5846d6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -202,11 +202,6 @@ public class ResultPartitionTest {
 	}
 
 	@Test
-	public void testReleaseMemoryOnBlockingPartition() throws Exception {
-		testReleaseMemory(ResultPartitionType.BLOCKING);
-	}
-
-	@Test
 	public void testReleaseMemoryOnPipelinedPartition() throws Exception {
 		testReleaseMemory(ResultPartitionType.PIPELINED);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
deleted file mode 100644
index 71dbc2b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.fillBufferBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link SpillableSubpartition}.
- */
-public class SpillableSubpartitionTest extends SubpartitionTestBase {
-	private static final int BUFFER_DATA_SIZE = 4096;
-
-	@Rule
-	public ExpectedException exception = ExpectedException.none();
-
-	/** Executor service for concurrent produce/consume tests. */
-	private static final ExecutorService executorService = Executors.newCachedThreadPool();
-
-	/** Asynchronous I/O manager. */
-	private static IOManager ioManager;
-
-	@BeforeClass
-	public static void setup() {
-		ioManager = new IOManagerAsync();
-	}
-
-	@AfterClass
-	public static void shutdown() {
-		executorService.shutdownNow();
-		ioManager.shutdown();
-	}
-
-	@Override
-	SpillableSubpartition createSubpartition() {
-		return createSubpartition(ioManager);
-	}
-
-	private static SpillableSubpartition createSubpartition(IOManager ioManager) {
-		ResultPartition parent = mock(ResultPartition.class);
-		BufferProvider bufferProvider = mock(BufferProvider.class);
-		when(parent.getBufferProvider()).thenReturn(bufferProvider);
-		when(bufferProvider.getMemorySegmentSize()).thenReturn(32 * 1024);
-		return new SpillableSubpartition(0, parent, ioManager);
-	}
-
-	/**
-	 * Tests a fix for FLINK-2384.
-	 *
-	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-2384">FLINK-2384</a>
-	 */
-	@Test
-	public void testConcurrentFinishAndReleaseMemory() throws Exception {
-		// Latches to blocking
-		final CountDownLatch doneLatch = new CountDownLatch(1);
-		final CountDownLatch blockLatch = new CountDownLatch(1);
-
-		// Blocking spill writer (blocks on the close call)
-		AsynchronousBufferFileWriter spillWriter = mock(AsynchronousBufferFileWriter.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				blockLatch.countDown();
-				doneLatch.await();
-				return null;
-			}
-		}).when(spillWriter).close();
-
-		// Mock I/O manager returning the blocking spill writer
-		IOManager ioManager = mock(IOManager.class);
-		when(ioManager.createBufferFileWriter(nullable(FileIOChannel.ID.class)))
-			.thenReturn(spillWriter);
-
-		// The partition
-		final SpillableSubpartition partition = new SpillableSubpartition(
-			0, mock(ResultPartition.class), ioManager);
-
-		// Spill the partition initially (creates the spill writer)
-		assertEquals(0, partition.releaseMemory());
-
-		ExecutorService executor = Executors.newSingleThreadExecutor();
-
-		// Finish the partition (this blocks because of the mock blocking writer)
-		Future<Void> blockingFinish = executor.submit(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				partition.finish();
-				return null;
-			}
-		});
-
-		// Ensure that the blocking call has been made
-		blockLatch.await();
-
-		// This call needs to go through. FLINK-2384 discovered a bug, in
-		// which the finish call was holding a lock, which was leading to a
-		// deadlock when another operation on the partition was happening.
-		partition.releaseMemory();
-
-		// Check that the finish call succeeded w/o problems as well to avoid
-		// false test successes.
-		doneLatch.countDown();
-		blockingFinish.get();
-	}
-
-	/**
-	 * Tests a fix for FLINK-2412.
-	 *
-	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-2412">FLINK-2412</a>
-	 */
-	@Test
-	public void testReleasePartitionAndGetNext() throws Exception {
-		// Create partition and add some buffers
-		SpillableSubpartition partition = createSubpartition();
-
-		partition.finish();
-
-		// Create the read view
-		ResultSubpartitionView readView = spy(partition
-			.createReadView(new NoOpBufferAvailablityListener()));
-
-		// The released state check (of the parent) needs to be independent
-		// of the released state of the view.
-		doNothing().when(readView).releaseAllResources();
-
-		// Release the partition, but the view does not notice yet.
-		partition.release();
-
-		assertNull(readView.getNextBuffer());
-	}
-
-	/**
-	 * Tests that a spilled partition is correctly read back in via a spilled
-	 * read view.
-	 */
-	@Test
-	public void testConsumeSpilledPartition() throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-
-		BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		BufferConsumer eventBufferConsumer =
-			EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
-		final int eventSize = eventBufferConsumer.getWrittenBytes();
-
-		partition.add(bufferConsumer.copy());
-		partition.add(bufferConsumer.copy());
-		partition.add(eventBufferConsumer);
-		partition.add(bufferConsumer);
-
-		assertEquals(4, partition.getTotalNumberOfBuffers());
-		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when getting/releasing the buffers
-
-		assertFalse(bufferConsumer.isRecycled());
-		assertEquals(4, partition.releaseMemory());
-		// now the bufferConsumer may be freed, depending on the timing of the write operation
-		// -> let's do this check at the end of the test (to save some time)
-		// still same statistics
-		assertEquals(4, partition.getTotalNumberOfBuffers());
-		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, partition.getTotalNumberOfBytes());
-
-		partition.finish();
-		// + one EndOfPartitionEvent
-		assertEquals(5, partition.getTotalNumberOfBuffers());
-		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes());
-
-		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
-
-		assertEquals(1, listener.getNumNotifications());
-		assertFalse(reader.nextBufferIsEvent()); // buffer
-
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
-		assertEquals(2, partition.getBuffersInBacklog());
-
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
-		assertEquals(1, partition.getBuffersInBacklog());
-
-		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
-		assertEquals(1, partition.getBuffersInBacklog());
-
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
-		assertEquals(0, partition.getBuffersInBacklog());
-
-		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
-		assertEquals(0, partition.getBuffersInBacklog());
-
-		// finally check that the bufferConsumer has been freed after a successful (or failed) write
-		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
-		while (!bufferConsumer.isRecycled() && System.currentTimeMillis() < deadline) {
-			Thread.sleep(1);
-		}
-		assertTrue(bufferConsumer.isRecycled());
-	}
-
-	/**
-	 * Tests that a spilled partition is correctly read back in via a spilled read view. The
-	 * partition went into spilled state before adding buffers and the access pattern resembles
-	 * the actual use of {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter}.
-	 */
-	@Test
-	public void testConsumeSpilledPartitionSpilledBeforeAdd() throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-		assertEquals(0, partition.releaseMemory()); // <---- SPILL to disk
-
-		BufferBuilder[] bufferBuilders = new BufferBuilder[] {
-			createBufferBuilder(BUFFER_DATA_SIZE),
-			createBufferBuilder(BUFFER_DATA_SIZE),
-			createBufferBuilder(BUFFER_DATA_SIZE),
-			createBufferBuilder(BUFFER_DATA_SIZE)
-		};
-		BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map(
-			BufferBuilder::createBufferConsumer
-		).toArray(BufferConsumer[]::new);
-
-		BufferConsumer eventBufferConsumer =
-			EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
-		final int eventSize = eventBufferConsumer.getWrittenBytes();
-
-		// note: only the newest buffer may be unfinished!
-		partition.add(bufferConsumers[0]);
-		fillBufferBuilder(bufferBuilders[0], BUFFER_DATA_SIZE).finish();
-		partition.add(bufferConsumers[1]);
-		fillBufferBuilder(bufferBuilders[1], BUFFER_DATA_SIZE).finish();
-		partition.add(eventBufferConsumer);
-		partition.add(bufferConsumers[2]);
-		bufferBuilders[2].finish(); // remains empty
-		partition.add(bufferConsumers[3]);
-		// last one: partially filled, unfinished
-		fillBufferBuilder(bufferBuilders[3], BUFFER_DATA_SIZE / 2);
-		// finished buffers only:
-		int expectedSize = BUFFER_DATA_SIZE * 2 + eventSize;
-
-		// now the bufferConsumer may be freed, depending on the timing of the write operation
-		// -> let's do this check at the end of the test (to save some time)
-		// still same statistics
-		assertEquals(5, partition.getTotalNumberOfBuffers());
-		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(expectedSize, partition.getTotalNumberOfBytes());
-
-		partition.finish();
-		expectedSize += BUFFER_DATA_SIZE / 2; // previously unfinished buffer
-		expectedSize += 4; // + one EndOfPartitionEvent
-		assertEquals(6, partition.getTotalNumberOfBuffers());
-		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(expectedSize, partition.getTotalNumberOfBytes());
-		Arrays.stream(bufferConsumers).forEach(bufferConsumer -> assertTrue(bufferConsumer.isRecycled()));
-
-		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
-
-		assertEquals(1, listener.getNumNotifications());
-		assertFalse(reader.nextBufferIsEvent()); // full buffer
-
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
-		assertEquals(2, partition.getBuffersInBacklog());
-
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
-		assertEquals(1, partition.getBuffersInBacklog());
-
-		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
-		assertEquals(1, partition.getBuffersInBacklog());
-
-		assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
-		assertEquals(0, partition.getBuffersInBacklog());
-
-		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
-		assertEquals(0, partition.getBuffersInBacklog());
-
-		//close buffer consumers
-		Arrays.stream(bufferConsumers).forEach(bufferConsumer -> bufferConsumer.close());
-	}
-
-	/**
-	 * Tests that a spilled partition is correctly read back in via a spilled
-	 * read view.
-	 */
-	@Test
-	public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-
-		BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		BufferConsumer eventBufferConsumer =
-			EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
-		final int eventSize = eventBufferConsumer.getWrittenBytes();
-
-		partition.add(bufferConsumer.copy());
-		partition.add(bufferConsumer.copy());
-		partition.add(eventBufferConsumer);
-		partition.add(bufferConsumer);
-		partition.finish();
-
-		assertEquals(5, partition.getTotalNumberOfBuffers());
-		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
-
-		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener);
-
-		// Initial notification
-		assertEquals(1, listener.getNumNotifications());
-		assertFalse(bufferConsumer.isRecycled());
-
-		assertFalse(reader.nextBufferIsEvent());
-
-		// first buffer (non-spilled)
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
-		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
-		assertEquals(2, partition.getBuffersInBacklog());
-		assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification
-		assertFalse(bufferConsumer.isRecycled());
-
-		// Spill now
-		assertEquals(3, partition.releaseMemory());
-		assertFalse(bufferConsumer.isRecycled()); // still one in the reader!
-		// still same statistics:
-		assertEquals(5, partition.getTotalNumberOfBuffers());
-		assertEquals(2, partition.getBuffersInBacklog());
-		// only updated when getting/spilling the buffers but without the nextBuffer (kept in memory)
-		assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, partition.getTotalNumberOfBytes());
-
-		// wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!)
-		listener.awaitNotifications(2, 30_000);
-		// Spiller finished
-		assertEquals(2, listener.getNumNotifications());
-
-		// after consuming and releasing the next buffer, the bufferConsumer may be freed,
-		// depending on the timing of the last write operation
-		// -> retain once so that we can check below
-		Buffer buffer = bufferConsumer.build();
-		buffer.retainBuffer();
-
-		// second buffer (retained in SpillableSubpartition#nextBuffer)
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
-		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
-		assertEquals(1, partition.getBuffersInBacklog());
-
-		bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
-
-		// the event (spilled)
-		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
-		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
-		assertEquals(1, partition.getBuffersInBacklog());
-
-		// last buffer (spilled)
-		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
-		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
-		assertEquals(0, partition.getBuffersInBacklog());
-
-		buffer.recycleBuffer();
-		assertTrue(buffer.isRecycled());
-
-		// End of partition
-		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
-		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
-		assertEquals(0, partition.getBuffersInBacklog());
-
-		// finally check that the bufferConsumer has been freed after a successful (or failed) write
-		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
-		while (!bufferConsumer.isRecycled() && System.currentTimeMillis() < deadline) {
-			Thread.sleep(1);
-		}
-		assertTrue(bufferConsumer.isRecycled());
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a spillable finished partition.
-	 */
-	@Test
-	public void testAddOnFinishedSpillablePartition() throws Exception {
-		testAddOnFinishedPartition(false);
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a spilled finished partition.
-	 */
-	@Test
-	public void testAddOnFinishedSpilledPartition() throws Exception {
-		testAddOnFinishedPartition(true);
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a finished partition.
-	 *
-	 * @param spilled
-	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
-	 * 		spillable).
-	 */
-	private void testAddOnFinishedPartition(boolean spilled) throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-		if (spilled) {
-			assertEquals(0, partition.releaseMemory());
-		}
-		partition.finish();
-		// finish adds an EndOfPartitionEvent
-		assertEquals(1, partition.getTotalNumberOfBuffers());
-		// if not spilled, statistics are only updated when consuming the buffers
-		assertEquals(spilled ? 4 : 0, partition.getTotalNumberOfBytes());
-
-		BufferConsumer buffer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		try {
-			partition.add(buffer);
-		} finally {
-			if (!buffer.isRecycled()) {
-				buffer.close();
-				Assert.fail("buffer not recycled");
-			}
-		}
-		// still same statistics
-		assertEquals(1, partition.getTotalNumberOfBuffers());
-		// if not spilled, statistics are only updated when consuming the buffers
-		assertEquals(spilled ? 4 : 0, partition.getTotalNumberOfBytes());
-	}
-
-	@Test
-	public void testAddOnReleasedSpillablePartition() throws Exception {
-		testAddOnReleasedPartition(false);
-	}
-
-	@Test
-	public void testAddOnReleasedSpilledPartition() throws Exception {
-		testAddOnReleasedPartition(true);
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a released partition.
-	 *
-	 * @param spilled
-	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
-	 * 		spillable).
-	 */
-	private void testAddOnReleasedPartition(boolean spilled) throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-		partition.release();
-		if (spilled) {
-			assertEquals(0, partition.releaseMemory());
-		}
-
-		BufferConsumer buffer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		boolean bufferRecycled;
-		try {
-			partition.add(buffer);
-		} finally {
-			bufferRecycled = buffer.isRecycled();
-			if (!bufferRecycled) {
-				buffer.close();
-			}
-		}
-		if (!bufferRecycled) {
-			Assert.fail("buffer not recycled");
-		}
-		assertEquals(0, partition.getTotalNumberOfBuffers());
-		assertEquals(0, partition.getTotalNumberOfBytes());
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a spilled partition where adding the
-	 * write request fails with an exception.
-	 */
-	@Test
-	public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
-		// simulate slow writer by a no-op write operation
-		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
-		SpillableSubpartition partition = createSubpartition(ioManager);
-		assertEquals(0, partition.releaseMemory());
-
-		BufferConsumer buffer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		boolean bufferRecycled;
-		try {
-			partition.add(buffer);
-		} finally {
-			ioManager.shutdown();
-			bufferRecycled = buffer.isRecycled();
-			if (!bufferRecycled) {
-				buffer.close();
-			}
-		}
-		if (bufferRecycled) {
-			Assert.fail("buffer recycled before the write operation completed");
-		}
-		assertEquals(1, partition.getTotalNumberOfBuffers());
-		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes());
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition without a view
-	 * but with a writer that does not do any write to check for correct buffer recycling.
-	 */
-	@Test
-	public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() throws Exception {
-		testReleaseOnSpillablePartitionWithSlowWriter(false);
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition which has a
-	 * view associated with it and a writer that does not do any write to check for correct buffer
-	 * recycling.
-	 */
-	@Test
-	public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() throws Exception {
-		testReleaseOnSpillablePartitionWithSlowWriter(true);
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition which has a a
-	 * writer that does not do any write to check for correct buffer recycling.
-	 */
-	private void testReleaseOnSpillablePartitionWithSlowWriter(boolean createView) throws Exception {
-		// simulate slow writer by a no-op write operation
-		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
-		SpillableSubpartition partition = createSubpartition(ioManager);
-
-		BufferConsumer buffer1 = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		BufferConsumer buffer2 = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		try {
-			// we need two buffers because the view will use one of them and not release it
-			partition.add(buffer1);
-			partition.add(buffer2);
-			assertFalse("buffer1 should not be recycled (still in the queue)", buffer1.isRecycled());
-			assertFalse("buffer2 should not be recycled (still in the queue)", buffer2.isRecycled());
-			assertEquals(2, partition.getTotalNumberOfBuffers());
-			assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when buffers are consumed or spilled
-
-			if (createView) {
-				// Create a read view
-				partition.finish();
-				partition.createReadView(new NoOpBufferAvailablityListener());
-				assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when buffers are consumed or spilled
-			}
-
-			// one instance of the buffers is placed in the view's nextBuffer and not released
-			// (if there is no view, there will be no additional EndOfPartitionEvent)
-			assertEquals(2, partition.releaseMemory());
-			assertFalse("buffer1 should not be recycled (advertised as nextBuffer)", buffer1.isRecycled());
-			assertFalse("buffer2 should not be recycled (not written yet)", buffer2.isRecycled());
-		} finally {
-			ioManager.shutdown();
-			if (!buffer1.isRecycled()) {
-				buffer1.close();
-			}
-			if (!buffer2.isRecycled()) {
-				buffer2.close();
-			}
-		}
-		// note: a view requires a finished partition which has an additional EndOfPartitionEvent
-		assertEquals(2 + (createView ? 1 : 0), partition.getTotalNumberOfBuffers());
-		// with a view, one buffer remains in nextBuffer and is not counted yet
-		assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : BUFFER_DATA_SIZE), partition.getTotalNumberOfBytes());
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a spilled partition where adding the
-	 * write request fails with an exception.
-	 */
-	@Test
-	public void testAddOnSpilledPartitionWithFailingWriter() throws Exception {
-		IOManager ioManager = new IOManagerAsyncWithClosedBufferFileWriter();
-		SpillableSubpartition partition = createSubpartition(ioManager);
-		assertEquals(0, partition.releaseMemory());
-
-		exception.expect(IOException.class);
-
-		BufferConsumer buffer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		boolean bufferRecycled;
-		try {
-			partition.add(buffer);
-		} finally {
-			ioManager.shutdown();
-			bufferRecycled = buffer.isRecycled();
-			if (!bufferRecycled) {
-				buffer.close();
-			}
-		}
-		if (!bufferRecycled) {
-			Assert.fail("buffer not recycled");
-		}
-		assertEquals(0, partition.getTotalNumberOfBuffers());
-		assertEquals(0, partition.getTotalNumberOfBytes());
-	}
-
-	/**
-	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and no
-	 * read view attached.
-	 */
-	@Test
-	public void testCleanupReleasedSpillablePartitionNoView() throws Exception {
-		testCleanupReleasedPartition(false, false);
-	}
-
-	/**
-	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and a
-	 * read view attached - [FLINK-8371].
-	 */
-	@Test
-	public void testCleanupReleasedSpillablePartitionWithView() throws Exception {
-		testCleanupReleasedPartition(false, true);
-	}
-
-	/**
-	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and no
-	 * read view attached.
-	 */
-	@Test
-	public void testCleanupReleasedSpilledPartitionNoView() throws Exception {
-		testCleanupReleasedPartition(true, false);
-	}
-
-	/**
-	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and a
-	 * read view attached.
-	 */
-	@Test
-	public void testCleanupReleasedSpilledPartitionWithView() throws Exception {
-		testCleanupReleasedPartition(true, true);
-	}
-
-	/**
-	 * Tests cleanup of {@link SpillableSubpartition#release()}.
-	 *
-	 * @param spilled
-	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
-	 * 		spillable)
-	 * @param createView
-	 * 		whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>)
-	 */
-	private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-
-		BufferConsumer buffer1 = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		BufferConsumer buffer2 = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-		boolean buffer1Recycled;
-		boolean buffer2Recycled;
-		try {
-			partition.add(buffer1);
-			partition.add(buffer2);
-			// create the read view before spilling
-			// (tests both code paths since this view may then contain the spilled view)
-			ResultSubpartitionView view = null;
-			if (createView) {
-				partition.finish();
-				view = partition.createReadView(new NoOpBufferAvailablityListener());
-			}
-			if (spilled) {
-				// note: in case we create a view, one buffer will already reside in the view and
-				//       one EndOfPartitionEvent will be added instead (so overall the number of
-				//       buffers to spill is the same
-				assertEquals(2, partition.releaseMemory());
-			}
-
-			partition.release();
-
-			assertTrue(partition.isReleased());
-			if (createView) {
-				assertTrue(view.isReleased());
-			}
-			assertTrue(buffer1.isRecycled());
-		} finally {
-			buffer1Recycled = buffer1.isRecycled();
-			if (!buffer1Recycled) {
-				buffer1.close();
-			}
-			buffer2Recycled = buffer2.isRecycled();
-			if (!buffer2Recycled) {
-				buffer2.close();
-			}
-		}
-		if (!buffer1Recycled) {
-			Assert.fail("buffer 1 not recycled");
-		}
-		if (!buffer2Recycled) {
-			Assert.fail("buffer 2 not recycled");
-		}
-		// note: in case we create a view, there will be an additional EndOfPartitionEvent
-		assertEquals(createView ? 3 : 2, partition.getTotalNumberOfBuffers());
-		if (spilled) {
-			// with a view, one buffer remains in nextBuffer and is not counted yet
-			assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : BUFFER_DATA_SIZE),
-				partition.getTotalNumberOfBytes());
-		} else {
-			// non-spilled byte statistics are only updated when buffers are consumed
-			assertEquals(0, partition.getTotalNumberOfBytes());
-		}
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} spilled bytes and
-	 * buffers counting.
-	 */
-	@Test
-	public void testSpillFinishedBufferConsumersFull() throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-		BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE);
-
-		partition.add(bufferBuilder.createBufferConsumer());
-		assertEquals(0, partition.releaseMemory());
-		assertEquals(1, partition.getBuffersInBacklog());
-		// finally fill the buffer with some bytes
-		fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
-		assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers(false));
-		assertEquals(1, partition.getBuffersInBacklog());
-	}
-
-	/**
-	 * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} spilled bytes and
-	 * buffers counting with partially filled buffers.
-	 */
-	@Test
-	public void testSpillFinishedBufferConsumersPartial() throws Exception {
-		SpillableSubpartition partition = createSubpartition();
-		BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE * 2);
-
-		partition.add(bufferBuilder.createBufferConsumer());
-		fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE);
-
-		assertEquals(0, partition.releaseMemory());
-		assertEquals(2, partition.getBuffersInBacklog()); // partial one spilled, buffer consumer still enqueued
-		// finally fill the buffer with some bytes
-		fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
-		assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers(false));
-		assertEquals(2, partition.getBuffersInBacklog());
-	}
-
-	/**
-	 * An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its
-	 * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
-	 *
-	 * <p>These {@link BufferFileWriter} objects will thus throw an exception when trying to add
-	 * write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}.
-	 */
-	private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync {
-		@Override
-		public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
-				throws IOException {
-			BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID);
-			bufferFileWriter.close();
-			return bufferFileWriter;
-		}
-	}
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 7c083ad..13cab65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -27,10 +27,8 @@ import org.junit.Test;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 /**
  * Basic subpartition behaviour tests.
@@ -40,11 +38,30 @@ public abstract class SubpartitionTestBase extends TestLogger {
 	/**
 	 * Return the subpartition to be tested.
 	 */
-	abstract ResultSubpartition createSubpartition();
+	abstract ResultSubpartition createSubpartition() throws Exception;
+
+	/**
+	 * Return the subpartition to be used for tests where write calls should fail.
+	 */
+	abstract ResultSubpartition createFailingWritesSubpartition() throws Exception;
 
 	// ------------------------------------------------------------------------
 
 	@Test
+	public void createReaderAfterDispose() throws Exception {
+		final ResultSubpartition subpartition = createSubpartition();
+		subpartition.release();
+
+		try {
+			subpartition.createReadView(() -> {});
+			fail("expected an exception");
+		}
+		catch (IllegalStateException e) {
+			// expected
+		}
+	}
+
+	@Test
 	public void testAddAfterFinish() throws Exception {
 		final ResultSubpartition subpartition = createSubpartition();
 
@@ -52,14 +69,14 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			subpartition.finish();
 			assertEquals(1, subpartition.getTotalNumberOfBuffers());
 			assertEquals(0, subpartition.getBuffersInBacklog());
-			assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming the buffers
 
 			BufferConsumer bufferConsumer = createFilledBufferConsumer(4096, 4096);
 
 			assertFalse(subpartition.add(bufferConsumer));
+			assertTrue(bufferConsumer.isRecycled());
+
 			assertEquals(1, subpartition.getTotalNumberOfBuffers());
 			assertEquals(0, subpartition.getBuffersInBacklog());
-			assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming the buffers
 		} finally {
 			if (subpartition != null) {
 				subpartition.release();
@@ -74,16 +91,11 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		try {
 			subpartition.release();
 
-			assertEquals(0, subpartition.getTotalNumberOfBuffers());
-			assertEquals(0, subpartition.getBuffersInBacklog());
-			assertEquals(0, subpartition.getTotalNumberOfBytes());
-
 			BufferConsumer bufferConsumer = createFilledBufferConsumer(4096, 4096);
 
 			assertFalse(subpartition.add(bufferConsumer));
-			assertEquals(0, subpartition.getTotalNumberOfBuffers());
-			assertEquals(0, subpartition.getBuffersInBacklog());
-			assertEquals(0, subpartition.getTotalNumberOfBytes());
+			assertTrue(bufferConsumer.isRecycled());
+
 		} finally {
 			if (subpartition != null) {
 				subpartition.release();
@@ -92,38 +104,71 @@ public abstract class SubpartitionTestBase extends TestLogger {
 	}
 
 	@Test
-	public void testReleaseParent() throws Exception {
+	public void testReleasingReaderDoesNotReleasePartition() throws Exception {
 		final ResultSubpartition partition = createSubpartition();
-		verifyViewReleasedAfterParentRelease(partition);
+		partition.add(createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
+		partition.finish();
+
+		final ResultSubpartitionView reader = partition.createReadView(new NoOpBufferAvailablityListener());
+
+		assertFalse(partition.isReleased());
+		assertFalse(reader.isReleased());
+
+		reader.releaseAllResources();
+
+		assertTrue(reader.isReleased());
+		assertFalse(partition.isReleased());
+
+		partition.release();
 	}
 
 	@Test
-	public void testReleaseParentAfterSpilled() throws Exception {
+	public void testReleaseIsIdempotent() throws Exception {
 		final ResultSubpartition partition = createSubpartition();
-		partition.releaseMemory();
+		partition.add(createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
+		partition.finish();
 
-		verifyViewReleasedAfterParentRelease(partition);
+		partition.release();
+		partition.release();
+		partition.release();
 	}
 
-	private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
-		// Add a bufferConsumer
-		BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
-		partition.add(bufferConsumer);
+	@Test
+	public void testReadAfterDispose() throws Exception {
+		final ResultSubpartition partition = createSubpartition();
+		partition.add(createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
 		partition.finish();
 
-		// Create the view
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-		ResultSubpartitionView view = partition.createReadView(listener);
+		final ResultSubpartitionView reader = partition.createReadView(new NoOpBufferAvailablityListener());
+		reader.releaseAllResources();
 
-		// The added bufferConsumer and end-of-partition event
-		assertNotNull(view.getNextBuffer());
-		assertNotNull(view.getNextBuffer());
+		// the reader must not throw an exception
+		reader.getNextBuffer();
 
-		// Release the parent
-		assertFalse(view.isReleased());
-		partition.release();
+		// ideally, we want this to be null, but the pipelined partition still serves data
+		// after dispose (which is unintuitive, but does not affect correctness)
+//		assertNull(reader.getNextBuffer());
+	}
+
+	@Test
+	public void testRecycleBufferAndConsumerOnFailure() throws Exception {
+		final ResultSubpartition subpartition = createFailingWritesSubpartition();
+		try {
+			final BufferConsumer consumer = BufferBuilderTestUtils.createFilledBufferConsumer(100);
+
+			try {
+				subpartition.add(consumer);
+				subpartition.flush();
+				fail("should fail with an exception");
+			}
+			catch (Exception ignored) {
+				// expected
+			}
 
-		// Verify that parent release is reflected at partition view
-		assertTrue(view.isReleased());
+			assertTrue(consumer.isRecycled());
+		}
+		finally {
+			subpartition.release();
+		}
 	}
 }