You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/05 14:21:54 UTC

[4/5] flink git commit: [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures

[FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures

This fixes a double-recycle in SpillableSubpartitionView and also makes sure
that even if adding the (asynchronous) write operation fails, the buffer is
properly freed in code that did not perform this cleanup. It avoids code
duplication of this cleanup and it is also more consistent to take over
responsibility of the given buffer even if an exception is thrown.

[FLINK-7499][io] complete the idiom of ResultSubpartition#add() taking over ownership of the buffer

The buffer will now always be released once and at the right time and the caller
must not worry about the buffer release if a called function threw an exception.

This closes #4581.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/622daa44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/622daa44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/622daa44

Branch: refs/heads/master
Commit: 622daa447755b984644212f56c5540253a10c149
Parents: 79bcdff
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 14:58:21 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 15:18:25 2018 +0100

----------------------------------------------------------------------
 .../iomanager/AsynchronousBufferFileWriter.java |  20 +-
 .../BlockChannelWriterWithCallback.java         |   4 +-
 .../partition/PipelinedSubpartition.java        |   1 +
 .../io/network/partition/ResultPartition.java   |  12 +-
 .../network/partition/ResultSubpartition.java   |  12 +
 .../partition/SpillableSubpartition.java        |  16 +-
 .../partition/SpillableSubpartitionView.java    |   6 +-
 .../AsynchronousBufferFileWriterTest.java       |  30 +++
 .../IOManagerAsyncWithNoOpBufferFileWriter.java |  53 ++++
 .../network/partition/ResultPartitionTest.java  |  63 ++++-
 .../partition/SpillableSubpartitionTest.java    | 252 ++++++++++++++++++-
 .../partition/SpilledSubpartitionViewTest.java  |   4 +
 12 files changed, 449 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
index 14bb8f7..9a78d0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 
@@ -31,9 +32,26 @@ public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buff
 		super(channelID, requestQueue, CALLBACK, true);
 	}
 
+	/**
+	 * Writes the given block asynchronously.
+	 *
+	 * @param buffer
+	 * 		the buffer to be written (will be recycled when done)
+	 *
+	 * @throws IOException
+	 * 		thrown if adding the write operation fails
+	 */
 	@Override
 	public void writeBlock(Buffer buffer) throws IOException {
-		addRequest(new BufferWriteRequest(this, buffer));
+		try {
+			// if successfully added, the buffer will be recycled after the write operation
+			addRequest(new BufferWriteRequest(this, buffer));
+		} catch (Throwable e) {
+			// if not added, we need to recycle here
+			buffer.recycle();
+			ExceptionUtils.rethrowIOException(e);
+		}
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
index f7618e4..5738787 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -26,8 +26,8 @@ public interface BlockChannelWriterWithCallback<T> extends FileIOChannel {
 	 * Writes the given block. The request may be executed synchronously, or asynchronously, depending
 	 * on the implementation.
 	 *
-	 * @param block The segment to be written.
+	 * @param block The segment to be written (transferring ownership to this writer).
 	 * @throws IOException Thrown, when the writer encounters an I/O error.
 	 */
 	void writeBlock(T block) throws IOException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index ed72b51..c1d6f13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -67,6 +67,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
+				buffer.recycle();
 				return false;
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index be050b3..ea2cca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -265,6 +265,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	@Override
 	public void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException {
+		checkNotNull(buffer);
 		boolean success = false;
 
 		try {
@@ -272,6 +273,8 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 			final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
 
+			// retain for buffer use after add() but also to have a simple path for recycle()
+			buffer.retain();
 			synchronized (subpartition) {
 				success = subpartition.add(buffer);
 
@@ -279,14 +282,11 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 				totalNumberOfBuffers++;
 				totalNumberOfBytes += buffer.getSize();
 			}
-		}
-		finally {
+		} finally {
 			if (success) {
 				notifyPipelinedConsumers();
 			}
-			else {
-				buffer.recycle();
-			}
+			buffer.recycle();
 		}
 	}
 
@@ -462,7 +462,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	// ------------------------------------------------------------------------
 
-	private void checkInProduceState() {
+	private void checkInProduceState() throws IllegalStateException {
 		checkState(!isFinished, "Partition already finished.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 3b4e3c9..e73082a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -70,6 +70,18 @@ public abstract class ResultSubpartition {
 		return parent.getFailureCause();
 	}
 
+	/**
+	 * Adds the given buffer.
+	 *
+	 * <p>The request may be executed synchronously, or asynchronously, depending on the
+	 * implementation.
+	 *
+	 * @param buffer
+	 * 		the buffer to add (transferring ownership to this writer)
+	 *
+	 * @throws IOException
+	 * 		thrown in case of errors while adding the buffer
+	 */
 	abstract public boolean add(Buffer buffer) throws IOException;
 
 	abstract public void finish() throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 065de8e..4a8e165 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -92,6 +92,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
+				buffer.recycle();
 				return false;
 			}
 
@@ -107,10 +108,15 @@ class SpillableSubpartition extends ResultSubpartition {
 		}
 
 		// Didn't return early => go to disk
-		spillWriter.writeBlock(buffer);
-		synchronized (buffers) {
-			// See the note above, but only do this if the buffer was correctly added!
-			updateStatistics(buffer);
+		try {
+			// retain buffer for updateStatistics() below
+			spillWriter.writeBlock(buffer.retain());
+			synchronized (buffers) {
+				// See the note above, but only do this if the buffer was correctly added!
+				updateStatistics(buffer);
+			}
+		} finally {
+			buffer.recycle();
 		}
 
 		return true;
@@ -207,7 +213,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			ResultSubpartitionView view = readView;
 
 			if (view != null && view.getClass() == SpillableSubpartitionView.class) {
-				// If there is a spilalble view, it's the responsibility of the
+				// If there is a spillable view, it's the responsibility of the
 				// view to release memory.
 				SpillableSubpartitionView spillableView = (SpillableSubpartitionView) view;
 				return spillableView.releaseMemory();

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index df8de54..6781902 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -108,11 +108,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 				for (int i = 0; i < numBuffers; i++) {
 					Buffer buffer = buffers.remove();
 					spilledBytes += buffer.getSize();
-					try {
-						spillWriter.writeBlock(buffer);
-					} finally {
-						buffer.recycle();
-					}
+					spillWriter.writeBlock(buffer);
 				}
 
 				spilledView = new SpilledSubpartitionView(

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 40f3e32..4c25e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.concurrent.Callable;
@@ -39,7 +43,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link AsynchronousBufferFileWriter}.
+ */
 public class AsynchronousBufferFileWriterTest {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
 
 	private static final IOManager ioManager = new IOManagerAsync();
 
@@ -67,6 +76,27 @@ public class AsynchronousBufferFileWriterTest {
 	}
 
 	@Test
+	public void testAddWithFailingWriter() throws Exception {
+		AsynchronousBufferFileWriter writer =
+			new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<>());
+		writer.close();
+
+		exception.expect(IOException.class);
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			writer.writeBlock(buffer);
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			assertEquals("Shouln't increment number of outstanding requests.", 0, writer.getNumberOfOutstandingRequests());
+		}
+	}
+
+	@Test
 	public void testSubscribe() throws Exception {
 		final TestNotificationListener listener = new TestNotificationListener();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
new file mode 100644
index 0000000..363e02b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
+ *
+ * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
+ */
+public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
+	@Override
+	public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
+			throws IOException {
+		return new NoOpAsynchronousBufferFileWriter(channelID, getWriteRequestQueue(channelID));
+	}
+
+	/**
+	 * {@link BufferFileWriter} subclass with a no-op in {@link #writeBlock(Buffer)}.
+	 */
+	private static class NoOpAsynchronousBufferFileWriter extends AsynchronousBufferFileWriter {
+
+		private NoOpAsynchronousBufferFileWriter(
+				ID channelID,
+				RequestQueue<WriteRequest> requestQueue) throws IOException {
+			super(channelID, requestQueue);
+		}
+
+		@Override
+		public void writeBlock(Buffer buffer) throws IOException {
+			// do nothing
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 9fb7fd3..5d24b4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,25 +20,41 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for {@link ResultPartition}.
+ */
 public class ResultPartitionTest {
 
+	/** Asynchronous I/O manager. */
+	private static final IOManager ioManager = new IOManagerAsync();
+
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	/**
 	 * Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
 	 */
@@ -49,7 +65,11 @@ public class ResultPartitionTest {
 			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 			ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
 			partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
-			verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+			verify(notifier, times(1))
+				.notifyPartitionConsumable(
+					eq(partition.getJobId()),
+					eq(partition.getPartitionId()),
+					any(TaskActions.class));
 		}
 
 		{
@@ -180,6 +200,45 @@ public class ResultPartitionTest {
 		assertTrue(buffer.isRecycled());
 	}
 
+	@Test
+	public void testAddOnPipelinedPartition() throws Exception {
+		testAddOnPartition(ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testAddOnBlockingPartition() throws Exception {
+		testAddOnPartition(ResultPartitionType.BLOCKING);
+	}
+
+	/**
+	 * Tests {@link ResultPartition#writeBuffer(Buffer, int)} on a working partition.
+	 *
+	 * @param pipelined the result partition type to set up
+	 */
+	protected void testAddOnPartition(final ResultPartitionType pipelined)
+		throws Exception {
+		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+		ResultPartition partition = createPartition(notifier, pipelined, true);
+		Buffer buffer = TestBufferFactory.createBuffer();
+		try {
+			// partition.add() adds the buffer without recycling it (if not spilling)
+			partition.writeBuffer(buffer, 0);
+			assertFalse("buffer should not be recycled (still in the queue)", buffer.isRecycled());
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+			}
+			// should have been notified for pipelined partitions
+			if (pipelined.isPipelined()) {
+				verify(notifier, times(1))
+					.notifyPartitionConsumable(
+						eq(partition.getJobId()),
+						eq(partition.getPartitionId()),
+						any(TaskActions.class));
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static ResultPartition createPartition(
@@ -196,7 +255,7 @@ public class ResultPartitionTest {
 			1,
 			mock(ResultPartitionManager.class),
 			notifier,
-			mock(IOManager.class),
+			ioManager,
 			sendScheduleOrUpdateConsumersMessage);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 05a364d..c50b361 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -20,19 +20,26 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -56,12 +63,17 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link SpillableSubpartition}.
+ */
 public class SpillableSubpartitionTest extends SubpartitionTestBase {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
 
-	/** Executor service for concurrent produce/consume tests */
-	private final static ExecutorService executorService = Executors.newCachedThreadPool();
+	/** Executor service for concurrent produce/consume tests. */
+	private static final ExecutorService executorService = Executors.newCachedThreadPool();
 
-	/** Asynchronous I/O manager */
+	/** Asynchronous I/O manager. */
 	private static final IOManager ioManager = new IOManagerAsync();
 
 	@AfterClass
@@ -72,6 +84,10 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 	@Override
 	SpillableSubpartition createSubpartition() {
+		return createSubpartition(ioManager);
+	}
+
+	private static SpillableSubpartition createSubpartition(IOManager ioManager) {
 		ResultPartition parent = mock(ResultPartition.class);
 		BufferProvider bufferProvider = mock(BufferProvider.class);
 		when(parent.getBufferProvider()).thenReturn(bufferProvider);
@@ -313,6 +329,218 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertTrue(buffer.isRecycled());
 	}
 
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition.
+	 */
+	@Test
+	public void testAddOnFinishedSpillablePartition() throws Exception {
+		testAddOnFinishedPartition(false);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition.
+	 */
+	@Test
+	public void testAddOnFinishedSpilledPartition() throws Exception {
+		testAddOnFinishedPartition(true);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a finished partition.
+	 *
+	 * @param spilled
+	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
+	 * 		spillable).
+	 */
+	private void testAddOnFinishedPartition(boolean spilled) throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+		if (spilled) {
+			assertEquals(0, partition.releaseMemory());
+		}
+		partition.finish();
+		// finish adds an EndOfPartitionEvent
+		assertEquals(1, partition.getTotalNumberOfBuffers());
+		assertEquals(4, partition.getTotalNumberOfBytes());
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			// still same statistics
+			assertEquals(1, partition.getTotalNumberOfBuffers());
+			assertEquals(4, partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition.
+	 */
+	@Test
+	public void testAddOnReleasedSpillablePartition() throws Exception {
+		testAddOnReleasedPartition(false);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition.
+	 */
+	@Test
+	public void testAddOnReleasedSpilledPartition() throws Exception {
+		testAddOnReleasedPartition(true);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a released partition.
+	 *
+	 * @param spilled
+	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
+	 * 		spillable).
+	 */
+	private void testAddOnReleasedPartition(boolean spilled) throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+		partition.release();
+		if (spilled) {
+			assertEquals(0, partition.releaseMemory());
+		}
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			assertEquals(0, partition.getTotalNumberOfBuffers());
+			assertEquals(0, partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the
+	 * write request fails with an exception.
+	 */
+	@Test
+	public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
+		// simulate slow writer by a no-op write operation
+		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
+		SpillableSubpartition partition = createSubpartition(ioManager);
+		assertEquals(0, partition.releaseMemory());
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			ioManager.shutdown();
+			if (buffer.isRecycled()) {
+				Assert.fail("buffer recycled before the write operation completed");
+			}
+			buffer.recycle();
+			assertEquals(1, partition.getTotalNumberOfBuffers());
+			assertEquals(4096, partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition without a view
+	 * but with a writer that does not do any write to check for correct buffer recycling.
+	 */
+	@Test
+	public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() throws Exception {
+		testReleaseOnSpillablePartitionWithSlowWriter(false);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition which has a
+	 * view associated with it and a writer that does not do any write to check for correct buffer
+	 * recycling.
+	 */
+	@Test
+	public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() throws Exception {
+		testReleaseOnSpillablePartitionWithSlowWriter(true);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition which has a a
+	 * writer that does not do any write to check for correct buffer recycling.
+	 */
+	private void testReleaseOnSpillablePartitionWithSlowWriter(boolean createView) throws Exception {
+		// simulate slow writer by a no-op write operation
+		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
+		SpillableSubpartition partition = createSubpartition(ioManager);
+
+		Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			// we need two buffers because the view will use one of them and not release it
+			partition.add(buffer1);
+			partition.add(buffer2);
+			assertFalse("buffer1 should not be recycled (still in the queue)", buffer1.isRecycled());
+			assertFalse("buffer2 should not be recycled (still in the queue)", buffer2.isRecycled());
+			assertEquals(2, partition.getTotalNumberOfBuffers());
+			assertEquals(4096 * 2, partition.getTotalNumberOfBytes());
+
+			if (createView) {
+				// Create a read view
+				partition.finish();
+				partition.createReadView(numBuffers -> {});
+			}
+
+			// one instance of the buffers is placed in the view's nextBuffer and not released
+			// (if there is no view, there will be no additional EndOfPartitionEvent)
+			assertEquals(2, partition.releaseMemory());
+			assertFalse("buffer1 should not be recycled (advertised as nextBuffer)", buffer1.isRecycled());
+			assertFalse("buffer2 should not be recycled (not written yet)", buffer2.isRecycled());
+		} finally {
+			ioManager.shutdown();
+			if (!buffer1.isRecycled()) {
+				buffer1.recycle();
+			}
+			if (!buffer2.isRecycled()) {
+				buffer2.recycle();
+			}
+			// note: a view requires a finished partition which has an additional EndOfPartitionEvent
+			assertEquals(2 + (createView ? 1 : 0), partition.getTotalNumberOfBuffers());
+			assertEquals(4096 * 2 + (createView ? 4 : 0), partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the
+	 * write request fails with an exception.
+	 */
+	@Test
+	public void testAddOnSpilledPartitionWithFailingWriter() throws Exception {
+		IOManager ioManager = new IOManagerAsyncWithClosedBufferFileWriter();
+		SpillableSubpartition partition = createSubpartition(ioManager);
+		assertEquals(0, partition.releaseMemory());
+
+		exception.expect(IOException.class);
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			ioManager.shutdown();
+
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			assertEquals(0, partition.getTotalNumberOfBuffers());
+			assertEquals(0, partition.getTotalNumberOfBytes());
+		}
+	}
+
 	private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
 
 		private long numNotifiedBuffers;
@@ -333,4 +561,22 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 			}
 		}
 	}
+
+	/**
+	 * An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its
+	 * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+	 *
+	 * <p>These {@link BufferFileWriter} objects will thus throw an exception when trying to add
+	 * write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}.
+	 */
+	private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync {
+		@Override
+		public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
+				throws IOException {
+			BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID);
+			bufferFileWriter.close();
+			return bufferFileWriter;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fa62593..b748e1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -44,6 +44,10 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link SpillableSubpartitionView}, in addition to indirect tests via {@link
+ * SpillableSubpartitionTest}.
+ */
 public class SpilledSubpartitionViewTest {
 
 	private static final IOManager IO_MANAGER = new IOManagerAsync();