You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/26 11:51:55 UTC

[2/3] flink git commit: [runtime] Fix possible resource leak in RemoteInputChannel

[runtime] Fix possible resource leak in RemoteInputChannel

It was possible that a concurrent release and enqueue operation from the
network I/O thread leave a buffer queued at a released channel, resulting in a
resource leak.

This commit fixes this and adds a test, which successfully provoked the race
condition before the fix.

The visibility of input channel operations is reduced to package private.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d72a3f7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d72a3f7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d72a3f7f

Branch: refs/heads/master
Commit: d72a3f7f99a7e242b191e993ced123d98a083d14
Parents: 95bdaad
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 24 10:25:52 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Mar 26 11:51:26 2015 +0100

----------------------------------------------------------------------
 .../network/netty/PartitionRequestClient.java   |   2 +-
 .../partition/consumer/InputChannel.java        |  14 +-
 .../partition/consumer/LocalInputChannel.java   |  14 +-
 .../partition/consumer/RemoteInputChannel.java  | 123 +++++++++------
 .../io/network/api/reader/BufferReaderTest.java |   2 +-
 .../IteratorWrappingTestSingleInputGate.java    | 103 -------------
 .../IteratorWrappingTestSingleInputGate.java    | 101 ++++++++++++
 .../consumer/RemoteInputChannelTest.java        | 152 +++++++++++++++++++
 .../partition/consumer/SingleInputGateTest.java |   1 -
 .../partition/consumer/TestInputChannel.java    | 134 ++++++++++++++++
 .../partition/consumer/TestSingleInputGate.java | 139 +++++++++++++++++
 .../partition/consumer/UnionInputGateTest.java  |   1 -
 .../io/network/util/TestBufferFactory.java      |   6 +
 .../io/network/util/TestInputChannel.java       | 135 ----------------
 .../io/network/util/TestSingleInputGate.java    | 140 -----------------
 .../runtime/operators/DataSinkTaskTest.java     |   2 +-
 .../operators/testutils/MockEnvironment.java    |   2 +-
 .../operators/testutils/TaskTestBase.java       |   2 +-
 18 files changed, 624 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 32e3951..b5f89e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -79,7 +79,7 @@ public class PartitionRequestClient {
 	 * The request goes to the remote producer, for which this partition
 	 * request client instance has been created.
 	 */
-	public void requestIntermediateResultPartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException {
+	public void requestSubpartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException {
 		partitionRequestHandler.addInputChannel(inputChannel);
 
 		tcpChannel.writeAndFlush(new PartitionRequest(partitionId, requestedQueueIndex, inputChannel.getInputChannelId()))

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 855509c..0805066 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -53,7 +53,7 @@ public abstract class InputChannel {
 	// Properties
 	// ------------------------------------------------------------------------
 
-	public int getChannelIndex() {
+	int getChannelIndex() {
 		return channelIndex;
 	}
 
@@ -75,12 +75,12 @@ public abstract class InputChannel {
 	 * The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
-	public abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
+	abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
 
 	/**
 	 * Returns the next buffer from the consumed subpartition.
 	 */
-	public abstract Buffer getNextBuffer() throws IOException, InterruptedException;
+	abstract Buffer getNextBuffer() throws IOException, InterruptedException;
 
 	// ------------------------------------------------------------------------
 	// Task events
@@ -94,19 +94,19 @@ public abstract class InputChannel {
 	 * the producer will wait for all backwards events. Otherwise, this will lead to an Exception
 	 * at runtime.
 	 */
-	public abstract void sendTaskEvent(TaskEvent event) throws IOException;
+	abstract void sendTaskEvent(TaskEvent event) throws IOException;
 
 	// ------------------------------------------------------------------------
 	// Life cycle
 	// ------------------------------------------------------------------------
 
-	public abstract boolean isReleased();
+	abstract boolean isReleased();
 
-	public abstract void notifySubpartitionConsumed() throws IOException;
+	abstract void notifySubpartitionConsumed() throws IOException;
 
 	/**
 	 * Releases all resources of the channel.
 	 */
-	public abstract void releaseAllResources() throws IOException;
+	abstract void releaseAllResources() throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 65f2627..8ea6407 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -53,7 +53,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	private volatile Buffer lookAhead;
 
-	public LocalInputChannel(
+	LocalInputChannel(
 			SingleInputGate gate,
 			int channelIndex,
 			ResultPartitionID partitionId,
@@ -71,7 +71,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		if (queueIterator == null) {
 			LOG.debug("Requesting LOCAL queue {} of partition {}.", subpartitionIndex, partitionId);
 
@@ -87,7 +87,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	}
 
 	@Override
-	public Buffer getNextBuffer() throws IOException, InterruptedException {
+	Buffer getNextBuffer() throws IOException, InterruptedException {
 		checkState(queueIterator != null, "Queried for a buffer before requesting a queue.");
 
 		// After subscribe notification
@@ -115,7 +115,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void sendTaskEvent(TaskEvent event) throws IOException {
+	void sendTaskEvent(TaskEvent event) throws IOException {
 		checkState(queueIterator != null, "Tried to send task event to producer before requesting a queue.");
 
 		if (!taskEventDispatcher.publish(partitionId, event)) {
@@ -128,12 +128,12 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	// ------------------------------------------------------------------------
 
 	@Override
-	public boolean isReleased() {
+	boolean isReleased() {
 		return isReleased;
 	}
 
 	@Override
-	public void notifySubpartitionConsumed() throws IOException {
+	void notifySubpartitionConsumed() throws IOException {
 		if (queueIterator != null) {
 			queueIterator.notifySubpartitionConsumed();
 		}
@@ -144,7 +144,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	 * iterator.
 	 */
 	@Override
-	public void releaseAllResources() throws IOException {
+	void releaseAllResources() throws IOException {
 		if (!isReleased) {
 			if (lookAhead != null) {
 				lookAhead.recycle();

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index cae7837..df653a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
@@ -44,37 +43,51 @@ public class RemoteInputChannel extends InputChannel {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
 
-	private final InputChannelID id;
+	/** ID to distinguish this channel from other channels sharing the same TCP connection. */
+	private final InputChannelID id = new InputChannelID();
 
-	private final ConnectionID producerAddress;
+	/** The connection to use to request the remote partition. */
+	private final ConnectionID connectionId;
 
-	private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
+	/** The connection manager to use connect to the remote partition provider. */
+	private final ConnectionManager connectionManager;
 
-	private final AtomicReference<IOException> ioError = new AtomicReference<IOException>();
+	/**
+	 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
+	 * is consumed by the receiving task thread.
+	 */
+	private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
 
+	/**
+	 * Flag indicating whether this channel has been released. Either called by the receiving task
+	 * thread or the task manager actor.
+	 */
 	private final AtomicBoolean isReleased = new AtomicBoolean();
 
+	/** Client to establish a (possibly shared) TCP connection and request the partition. */
 	private PartitionRequestClient partitionRequestClient;
 
+	/**
+	 * The next expected sequence number for the next buffer. This is modified by the network
+	 * I/O thread only.
+	 */
 	private int expectedSequenceNumber = 0;
 
-	private ConnectionManager connectionManager;
+	/**
+	 * An error possibly set by the network I/O thread.
+	 */
+	private volatile Throwable error;
 
-	public RemoteInputChannel(
-			SingleInputGate gate,
+	RemoteInputChannel(
+			SingleInputGate inputGate,
 			int channelIndex,
 			ResultPartitionID partitionId,
-			ConnectionID producerAddress,
+			ConnectionID connectionId,
 			ConnectionManager connectionManager) {
 
-		super(gate, channelIndex, partitionId);
+		super(inputGate, channelIndex, partitionId);
 
-		/**
-		 * This ID is used by the {@link PartitionRequestClient} to distinguish
-		 * between receivers, which share the same TCP connection.
-		 */
-		this.id = new InputChannelID();
-		this.producerAddress = checkNotNull(producerAddress);
+		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
 	}
 
@@ -83,22 +96,25 @@ public class RemoteInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		if (partitionRequestClient == null) {
-			LOG.debug("Requesting REMOTE queue {} from of partition {}.", subpartitionIndex, partitionId);
+			LOG.debug("{}: Requesting REMOTE subpartition {} of partition {}.",
+					this, subpartitionIndex, partitionId);
 
-			partitionRequestClient = connectionManager.createPartitionRequestClient(producerAddress);
+			// Create a client and request the partition
+			partitionRequestClient = connectionManager
+					.createPartitionRequestClient(connectionId);
 
-			partitionRequestClient.requestIntermediateResultPartition(partitionId, subpartitionIndex, this);
+			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this);
 		}
 	}
 
 	@Override
-	public Buffer getNextBuffer() throws IOException {
+	Buffer getNextBuffer() throws IOException {
 		checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
 		checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
 
-		checkIoError();
+		checkError();
 
 		synchronized (receivedBuffers) {
 			Buffer buffer = receivedBuffers.poll();
@@ -117,11 +133,11 @@ public class RemoteInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void sendTaskEvent(TaskEvent event) throws IOException {
+	void sendTaskEvent(TaskEvent event) throws IOException {
 		checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released.");
 		checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
 
-		checkIoError();
+		checkError();
 
 		partitionRequestClient.sendTaskEvent(partitionId, event, this);
 	}
@@ -131,12 +147,12 @@ public class RemoteInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public boolean isReleased() {
+	boolean isReleased() {
 		return isReleased.get();
 	}
 
 	@Override
-	public void notifySubpartitionConsumed() {
+	void notifySubpartitionConsumed() {
 		// Nothing to do
 	}
 
@@ -144,7 +160,7 @@ public class RemoteInputChannel extends InputChannel {
 	 * Releases all received buffers and closes the partition request client.
 	 */
 	@Override
-	public void releaseAllResources() throws IOException {
+	void releaseAllResources() throws IOException {
 		if (isReleased.compareAndSet(false, true)) {
 			synchronized (receivedBuffers) {
 				Buffer buffer;
@@ -155,21 +171,28 @@ public class RemoteInputChannel extends InputChannel {
 
 			if (partitionRequestClient != null) {
 				partitionRequestClient.close(this);
-			} else {
-				connectionManager.closeOpenChannelConnections(producerAddress);
+			}
+			else {
+				connectionManager.closeOpenChannelConnections(connectionId);
 			}
 		}
 	}
 
 	@Override
 	public String toString() {
-		return "RemoteInputChannel [" + partitionId + " at " + producerAddress + "]";
+		return "RemoteInputChannel [" + partitionId + " at " + connectionId + "]";
 	}
 
 	// ------------------------------------------------------------------------
 	// Network I/O notifications (called by network I/O thread)
 	// ------------------------------------------------------------------------
 
+	public int getNumberOfQueuedBuffers() {
+		synchronized (receivedBuffers) {
+			return receivedBuffers.size();
+		}
+	}
+
 	public InputChannelID getInputChannelId() {
 		return id;
 	}
@@ -186,8 +209,8 @@ public class RemoteInputChannel extends InputChannel {
 		boolean success = false;
 
 		try {
-			if (!isReleased.get()) {
-				synchronized (receivedBuffers) {
+			synchronized (receivedBuffers) {
+				if (!isReleased.get()) {
 					if (expectedSequenceNumber == sequenceNumber) {
 						receivedBuffers.add(buffer);
 						expectedSequenceNumber++;
@@ -195,13 +218,11 @@ public class RemoteInputChannel extends InputChannel {
 						notifyAvailableBuffer();
 
 						success = true;
-
-						return;
+					}
+					else {
+						onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
 					}
 				}
-
-				IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber);
-				ioError.compareAndSet(null, error);
 			}
 		}
 		finally {
@@ -212,33 +233,35 @@ public class RemoteInputChannel extends InputChannel {
 	}
 
 	public void onEmptyBuffer(int sequenceNumber) {
-		if (!isReleased.get()) {
-			synchronized (receivedBuffers) {
+		synchronized (receivedBuffers) {
+			if (!isReleased.get()) {
 				if (expectedSequenceNumber == sequenceNumber) {
 					expectedSequenceNumber++;
 				}
 				else {
-					IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber);
-					ioError.compareAndSet(null, error);
+					onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
 				}
 			}
 		}
 	}
 
-	public void onError(Throwable error) {
-		if (ioError.compareAndSet(null, error instanceof IOException ? (IOException) error : new IOException(error))) {
+	public void onError(Throwable cause) {
+		if (error == null) {
+			error = cause;
+
+			// Notify the input gate to trigger querying of this channel
 			notifyAvailableBuffer();
 		}
 	}
 
-	// ------------------------------------------------------------------------
-
-	private void checkIoError() throws IOException {
-		IOException error = ioError.get();
+	/**
+	 * Checks whether this channel got notified by the network I/O thread about an error.
+	 */
+	private void checkError() throws IOException {
+		final Throwable t = error;
 
-		if (error != null) {
-			throw new IOException(String.format("%s at remote input channel: %s].",
-					error.getClass().getName(), error.getMessage()));
+		if (t != null) {
+			throw new IOException(t);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index 7d3dbbe..e1f8fd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.util.TestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.event.EventListener;

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java
deleted file mode 100644
index 3968eda..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.util.TestInputChannel;
-import org.apache.flink.runtime.io.network.util.TestSingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
-
-	private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
-
-	private final int bufferSize;
-
-	private MutableObjectIterator<T> inputIterator;
-
-	private RecordSerializer<T> serializer;
-
-	private final T reuse;
-
-	public IteratorWrappingTestSingleInputGate(int bufferSize, Class<T> recordType, MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
-		super(1, false);
-
-		this.bufferSize = bufferSize;
-		this.reuse = InstantiationUtil.instantiate(recordType);
-
-		wrapIterator(iterator);
-	}
-
-	private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
-		inputIterator = iterator;
-		serializer = new SpanningRecordSerializer<T>();
-
-		// The input iterator can produce an infinite stream. That's why we have to serialize each
-		// record on demand and cannot do it upfront.
-		final Answer<Buffer> answer = new Answer<Buffer>() {
-			@Override
-			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-				if (inputIterator.next(reuse) != null) {
-					final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
-					serializer.setNextBuffer(buffer);
-					serializer.addRecord(reuse);
-
-					inputGate.onAvailableBuffer(inputChannel.getInputChannel());
-
-					// Call getCurrentBuffer to ensure size is set
-					return serializer.getCurrentBuffer();
-				}
-				else {
-
-					when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
-
-					return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-				}
-			}
-		};
-
-		when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
-
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel());
-
-		return this;
-	}
-
-	public IteratorWrappingTestSingleInputGate<T> read() {
-		inputGate.onAvailableBuffer(inputChannel.getInputChannel());
-
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
new file mode 100644
index 0000000..65780b4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
+
+	private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
+
+	private final int bufferSize;
+
+	private MutableObjectIterator<T> inputIterator;
+
+	private RecordSerializer<T> serializer;
+
+	private final T reuse;
+
+	public IteratorWrappingTestSingleInputGate(int bufferSize, Class<T> recordType, MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
+		super(1, false);
+
+		this.bufferSize = bufferSize;
+		this.reuse = InstantiationUtil.instantiate(recordType);
+
+		wrapIterator(iterator);
+	}
+
+	private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
+		inputIterator = iterator;
+		serializer = new SpanningRecordSerializer<T>();
+
+		// The input iterator can produce an infinite stream. That's why we have to serialize each
+		// record on demand and cannot do it upfront.
+		final Answer<Buffer> answer = new Answer<Buffer>() {
+			@Override
+			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				if (inputIterator.next(reuse) != null) {
+					final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+					serializer.setNextBuffer(buffer);
+					serializer.addRecord(reuse);
+
+					inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+
+					// Call getCurrentBuffer to ensure size is set
+					return serializer.getCurrentBuffer();
+				}
+				else {
+
+					when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
+
+					return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+				}
+			}
+		};
+
+		when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
+
+		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel());
+
+		return this;
+	}
+
+	public IteratorWrappingTestSingleInputGate<T> read() {
+		inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
new file mode 100644
index 0000000..7eb728c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteInputChannelTest {
+
+	@Test
+	public void testExceptionOnReordering() throws Exception {
+		// Setup
+		final SingleInputGate inputGate = mock(SingleInputGate.class);
+		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
+
+		// The test
+		inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0);
+
+		// This does not yet throw the exception, but sets the error at the channel.
+		inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29);
+
+		try {
+			inputChannel.getNextBuffer();
+
+			fail("Did not throw expected exception after enqueuing an out-of-order buffer.");
+		}
+		catch (Exception expected) {
+		}
+
+		// Need to notify the input gate for the out-of-order buffer as well. Otherwise the
+		// receiving task will not notice the error.
+		verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel));
+	}
+
+	@Test(timeout = 120 * 1000)
+	public void testConcurrentOnBufferAndRelease() throws Exception {
+		// Config
+		// Repeatedly spawn two tasks: one to queue buffers and the other to release the channel
+		// concurrently. We do this repeatedly to provoke races.
+		final int numberOfRepetitions = 8192;
+
+		// Setup
+		final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+		try {
+			// Test
+			final SingleInputGate inputGate = mock(SingleInputGate.class);
+
+			for (int i = 0; i < numberOfRepetitions; i++) {
+				final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
+
+				final Callable<Void> enqueueTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						while (true) {
+							for (int j = 0; j < 128; j++) {
+								inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j);
+							}
+
+							if (inputChannel.isReleased()) {
+								return null;
+							}
+						}
+					}
+				};
+
+				final Callable<Void> releaseTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						inputChannel.releaseAllResources();
+
+						return null;
+					}
+				};
+
+				// Submit tasks and wait to finish
+				List<Future<Void>> results = Lists.newArrayListWithCapacity(2);
+
+				results.add(executor.submit(enqueueTask));
+				results.add(executor.submit(releaseTask));
+
+				for (Future<Void> result : results) {
+					result.get();
+				}
+
+				assertEquals("Resource leak during concurrent release and enqueue.",
+						0, inputChannel.getNumberOfQueuedBuffers());
+			}
+		}
+		finally {
+			executor.shutdown();
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
+			throws IOException, InterruptedException {
+
+		final ConnectionManager connectionManager = mock(ConnectionManager.class);
+		when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
+				.thenReturn(mock(PartitionRequestClient.class));
+
+		return new RemoteInputChannel(
+				inputGate,
+				0,
+				new ResultPartitionID(),
+				mock(ConnectionID.class),
+				connectionManager);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index e1e3cff..5871514 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.io.network.util.TestInputChannel;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
new file mode 100644
index 0000000..3dafffd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A mocked input channel.
+ */
+public class TestInputChannel {
+
+	private final InputChannel mock = Mockito.mock(InputChannel.class);
+
+	private final SingleInputGate inputGate;
+
+	// Abusing Mockito here... ;)
+	protected OngoingStubbing<Buffer> stubbing;
+
+	public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+		checkArgument(channelIndex >= 0);
+		this.inputGate = checkNotNull(inputGate);
+
+		when(mock.getChannelIndex()).thenReturn(channelIndex);
+	}
+
+	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
+		if (stubbing == null) {
+			stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+		}
+		else {
+			stubbing = stubbing.thenReturn(buffer);
+		}
+
+		inputGate.onAvailableBuffer(mock);
+
+		return this;
+	}
+
+	public TestInputChannel readBuffer() throws IOException, InterruptedException {
+		final Buffer buffer = mock(Buffer.class);
+		when(buffer.isBuffer()).thenReturn(true);
+
+		return read(buffer);
+	}
+
+	public TestInputChannel readEvent() throws IOException, InterruptedException {
+		return read(EventSerializer.toBuffer(new TestTaskEvent()));
+	}
+
+	public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
+		return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
+	}
+
+	public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
+		final Answer<Buffer> answer = new Answer<Buffer>() {
+			@Override
+			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				// Return true after finishing
+				when(mock.isReleased()).thenReturn(true);
+
+				return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+			}
+		};
+
+		if (stubbing == null) {
+			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
+		}
+		else {
+			stubbing = stubbing.thenAnswer(answer);
+		}
+
+		inputGate.onAvailableBuffer(mock);
+
+		return this;
+	}
+
+	public InputChannel getInputChannel() {
+		return mock;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates test input channels and attaches them to the specified input gate.
+	 *
+	 * @return The created test input channels.
+	 */
+	public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+		checkNotNull(inputGate);
+		checkArgument(numberOfInputChannels > 0);
+
+		TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels];
+
+		for (int i = 0; i < numberOfInputChannels; i++) {
+			mocks[i] = new TestInputChannel(inputGate, i);
+
+			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+		}
+
+		return mocks;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
new file mode 100644
index 0000000..5033b3d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A test input gate to mock reading data.
+ */
+public class TestSingleInputGate {
+
+	protected final SingleInputGate inputGate;
+
+	protected final TestInputChannel[] inputChannels;
+
+	public TestSingleInputGate(int numberOfInputChannels) {
+		this(numberOfInputChannels, true);
+	}
+
+	public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
+		checkArgument(numberOfInputChannels >= 1);
+
+		this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
+
+		this.inputChannels = new TestInputChannel[numberOfInputChannels];
+
+		if (initialize) {
+			for (int i = 0; i < numberOfInputChannels; i++) {
+				inputChannels[i] = new TestInputChannel(inputGate, i);
+				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+			}
+		}
+	}
+
+	public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
+		checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
+
+		inputChannels[channelIndex].read(buffer);
+
+		return this;
+	}
+
+	public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
+		return readBuffer(0);
+	}
+
+	public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readBuffer();
+
+		return this;
+	}
+
+	public TestSingleInputGate readEvent() throws IOException, InterruptedException {
+		return readEvent(0);
+	}
+
+	public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readEvent();
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
+		for (TestInputChannel inputChannel : inputChannels) {
+			inputChannel.readEndOfSuperstepEvent();
+		}
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readEndOfSuperstepEvent();
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
+		for (TestInputChannel inputChannel : inputChannels) {
+			inputChannel.readEndOfPartitionEvent();
+		}
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readEndOfPartitionEvent();
+
+		return this;
+	}
+
+	public SingleInputGate getInputGate() {
+		return inputGate;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public List<Integer> readAllChannels() throws IOException, InterruptedException {
+		final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
+
+		for (int i = 0; i < inputChannels.length; i++) {
+			readOrder.add(i);
+		}
+
+		Collections.shuffle(readOrder);
+
+		for (int channelIndex : readOrder) {
+			inputChannels[channelIndex].readBuffer();
+		}
+
+		return readOrder;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index c7cb413..18e56ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.io.network.util.TestInputChannel;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index d10bf0c..0ff42b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -34,6 +34,8 @@ public class TestBufferFactory {
 
 	private static final BufferRecycler discardingRecycler = new DiscardingRecycler();
 
+	private static final Buffer mockBuffer = createBuffer();
+
 	private final int bufferSize;
 
 	private final BufferRecycler bufferRecycler;
@@ -85,4 +87,8 @@ public class TestBufferFactory {
 
 		return new Buffer(new MemorySegment(new byte[bufferSize]), discardingRecycler);
 	}
+
+	public static Buffer getMockBuffer() {
+		return mockBuffer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
deleted file mode 100644
index 0e9e8e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A mocked input channel.
- */
-public class TestInputChannel {
-
-	private final InputChannel mock = Mockito.mock(InputChannel.class);
-
-	private final SingleInputGate inputGate;
-
-	// Abusing Mockito here... ;)
-	protected OngoingStubbing<Buffer> stubbing;
-
-	public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
-		checkArgument(channelIndex >= 0);
-		this.inputGate = checkNotNull(inputGate);
-
-		when(mock.getChannelIndex()).thenReturn(channelIndex);
-	}
-
-	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
-		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
-		}
-		else {
-			stubbing = stubbing.thenReturn(buffer);
-		}
-
-		inputGate.onAvailableBuffer(mock);
-
-		return this;
-	}
-
-	public TestInputChannel readBuffer() throws IOException, InterruptedException {
-		final Buffer buffer = mock(Buffer.class);
-		when(buffer.isBuffer()).thenReturn(true);
-
-		return read(buffer);
-	}
-
-	public TestInputChannel readEvent() throws IOException, InterruptedException {
-		return read(EventSerializer.toBuffer(new TestTaskEvent()));
-	}
-
-	public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
-		return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
-	}
-
-	public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
-		final Answer<Buffer> answer = new Answer<Buffer>() {
-			@Override
-			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-				// Return true after finishing
-				when(mock.isReleased()).thenReturn(true);
-
-				return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-			}
-		};
-
-		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
-		}
-		else {
-			stubbing = stubbing.thenAnswer(answer);
-		}
-
-		inputGate.onAvailableBuffer(mock);
-
-		return this;
-	}
-
-	public InputChannel getInputChannel() {
-		return mock;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates test input channels and attaches them to the specified input gate.
-	 *
-	 * @return The created test input channels.
-	 */
-	public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
-		checkNotNull(inputGate);
-		checkArgument(numberOfInputChannels > 0);
-
-		TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels];
-
-		for (int i = 0; i < numberOfInputChannels; i++) {
-			mocks[i] = new TestInputChannel(inputGate, i);
-
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
-		}
-
-		return mocks;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
deleted file mode 100644
index d10e1a0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.mockito.Mockito.spy;
-
-/**
- * A test input gate to mock reading data.
- */
-public class TestSingleInputGate {
-
-	protected final SingleInputGate inputGate;
-
-	protected final TestInputChannel[] inputChannels;
-
-	public TestSingleInputGate(int numberOfInputChannels) {
-		this(numberOfInputChannels, true);
-	}
-
-	public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
-		checkArgument(numberOfInputChannels >= 1);
-
-		this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
-
-		this.inputChannels = new TestInputChannel[numberOfInputChannels];
-
-		if (initialize) {
-			for (int i = 0; i < numberOfInputChannels; i++) {
-				inputChannels[i] = new TestInputChannel(inputGate, i);
-				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
-			}
-		}
-	}
-
-	public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
-		checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
-
-		inputChannels[channelIndex].read(buffer);
-
-		return this;
-	}
-
-	public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
-		return readBuffer(0);
-	}
-
-	public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readBuffer();
-
-		return this;
-	}
-
-	public TestSingleInputGate readEvent() throws IOException, InterruptedException {
-		return readEvent(0);
-	}
-
-	public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readEvent();
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
-		for (TestInputChannel inputChannel : inputChannels) {
-			inputChannel.readEndOfSuperstepEvent();
-		}
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readEndOfSuperstepEvent();
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
-		for (TestInputChannel inputChannel : inputChannels) {
-			inputChannel.readEndOfPartitionEvent();
-		}
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readEndOfPartitionEvent();
-
-		return this;
-	}
-
-	public SingleInputGate getInputGate() {
-		return inputGate;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public List<Integer> readAllChannels() throws IOException, InterruptedException {
-		final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
-
-		for (int i = 0; i < inputChannels.length; i++) {
-			readOrder.add(i);
-		}
-
-		Collections.shuffle(readOrder);
-
-		for (int channelIndex : readOrder) {
-			inputChannels[channelIndex].readBuffer();
-		}
-
-		return readOrder;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 63ce5e2..e91d338 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 6625bbc..d5c9fbb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index b93d37e..6ffc97b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;