You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:53 UTC

[04/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
deleted file mode 100644
index 62375a6..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class MockConsumer implements Callable<Boolean> {
-
-	private static final int SLEEP_TIME_MS = 20;
-
-	private final IntermediateResultPartitionQueueIterator iterator;
-
-	private final boolean slowConsumer;
-
-	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-	public MockConsumer(IntermediateResultPartitionQueueIterator iterator, boolean slowConsumer) {
-		this.iterator = iterator;
-		this.slowConsumer = slowConsumer;
-	}
-
-	@Override
-	public Boolean call() throws Exception {
-		MockNotificationListener listener = new MockNotificationListener();
-
-		int currentNumber = 0;
-
-		try {
-			while (true) {
-				Buffer buffer = iterator.getNextBuffer();
-
-				if (slowConsumer) {
-					Thread.sleep(SLEEP_TIME_MS);
-				}
-
-				if (buffer == null) {
-					if (iterator.subscribe(listener)) {
-						listener.waitForNotification();
-					}
-					else if (iterator.isConsumed()) {
-						break;
-					}
-				}
-				else {
-					try {
-						if (buffer.isBuffer()) {
-							currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
-						}
-					}
-					finally {
-						buffer.recycle();
-					}
-				}
-			}
-		}
-		catch (Throwable t) {
-			error.compareAndSet(null, t);
-			return false;
-		}
-
-		return true;
-	}
-
-	public Throwable getError() {
-		return error.get();
-	}
-
-	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
-		MemorySegment segment = buffer.getMemorySegment();
-
-		for (int i = 4; i < segment.size(); i += 4) {
-			if (segment.getInt(i) != currentNumber++) {
-				throw new IllegalStateException("Read unexpected number from buffer.");
-			}
-		}
-
-		return currentNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
deleted file mode 100644
index 301169a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A mocked input channel.
- */
-public class MockInputChannel {
-
-	private final InputChannel mock = Mockito.mock(InputChannel.class);
-
-	private final SingleInputGate inputGate;
-
-	// Abusing Mockito here... ;)
-	protected OngoingStubbing<Buffer> stubbing;
-
-	public MockInputChannel(SingleInputGate inputGate, int channelIndex) {
-		checkArgument(channelIndex >= 0);
-		this.inputGate = checkNotNull(inputGate);
-
-		when(mock.getChannelIndex()).thenReturn(channelIndex);
-	}
-
-	public MockInputChannel read(Buffer buffer) throws IOException {
-		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
-		}
-		else {
-			stubbing = stubbing.thenReturn(buffer);
-		}
-
-		inputGate.onAvailableBuffer(mock);
-
-		return this;
-	}
-
-	public MockInputChannel readBuffer() throws IOException {
-		final Buffer buffer = mock(Buffer.class);
-		when(buffer.isBuffer()).thenReturn(true);
-
-		return read(buffer);
-	}
-
-	public MockInputChannel readEvent() throws IOException {
-		return read(EventSerializer.toBuffer(new TestTaskEvent()));
-	}
-
-	public MockInputChannel readEndOfSuperstepEvent() throws IOException {
-		return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
-	}
-
-	public MockInputChannel readEndOfPartitionEvent() throws IOException {
-		final Answer<Buffer> answer = new Answer<Buffer>() {
-			@Override
-			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-				// Return true after finishing
-				when(mock.isReleased()).thenReturn(true);
-
-				return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-			}
-		};
-
-		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
-		}
-		else {
-			stubbing = stubbing.thenAnswer(answer);
-		}
-
-		inputGate.onAvailableBuffer(mock);
-
-		return this;
-	}
-
-	public InputChannel getInputChannel() {
-		return mock;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static MockInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
-		checkNotNull(inputGate);
-		checkArgument(numberOfInputChannels > 0);
-
-		MockInputChannel[] mocks = new MockInputChannel[numberOfInputChannels];
-
-		for (int i = 0; i < numberOfInputChannels; i++) {
-			mocks[i] = new MockInputChannel(inputGate, i);
-
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
-		}
-
-		return mocks;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
deleted file mode 100644
index 56e0025..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MockNotificationListener implements NotificationListener {
-
-	final AtomicInteger numNotifications = new AtomicInteger();
-
-	@Override
-	public void onNotification() {
-		synchronized (numNotifications) {
-			numNotifications.incrementAndGet();
-
-			numNotifications.notifyAll();
-		}
-	}
-
-	public void waitForNotification() throws InterruptedException {
-
-		int current = numNotifications.get();
-
-		synchronized (numNotifications) {
-			while (current == numNotifications.get()) {
-				numNotifications.wait();
-			}
-		}
-	}
-
-	public int getNumberOfNotifications() {
-		return numNotifications.get();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
deleted file mode 100644
index 44d8ffe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class MockProducer implements Callable<Boolean> {
-
-	private static final int SLEEP_TIME_MS = 20;
-
-	private final IntermediateResultPartitionQueue queue;
-
-	private final BufferPool bufferPool;
-
-	private final int numBuffersToProduce;
-
-	private final boolean slowProducer;
-
-	private final AtomicInteger discardAfter = new AtomicInteger(Integer.MAX_VALUE);
-
-	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-	public MockProducer(IntermediateResultPartitionQueue queue, BufferPool bufferPool, int numBuffersToProduce, boolean slowProducer) {
-		this.queue = queue;
-		this.bufferPool = bufferPool;
-		this.numBuffersToProduce = numBuffersToProduce;
-		this.slowProducer = slowProducer;
-	}
-
-	@Override
-	public Boolean call() throws Exception {
-		try {
-			int currentNumber = 0;
-
-			for (int i = 0; i < numBuffersToProduce; i++) {
-				if (i >= discardAfter.get()) {
-					queue.discard();
-					return true;
-				}
-
-				Buffer buffer = bufferPool.requestBufferBlocking();
-
-				currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
-
-				queue.add(buffer);
-
-				if (slowProducer) {
-					Thread.sleep(SLEEP_TIME_MS);
-				}
-			}
-
-			queue.finish();
-		}
-		catch (Throwable t) {
-			error.compareAndSet(null, t);
-			return false;
-		}
-
-		return true;
-	}
-
-	void discard() {
-		discardAfter.set(0);
-	}
-
-	public void discardAfter(int numBuffers) {
-		discardAfter.set(numBuffers);
-	}
-
-	public Throwable getError() {
-		return error.get();
-	}
-
-	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
-		MemorySegment segment = buffer.getMemorySegment();
-
-		for (int i = 4; i < segment.size(); i += 4) {
-			segment.putInt(i, currentNumber++);
-		}
-
-		return currentNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
deleted file mode 100644
index 3c708ac..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.mockito.Mockito.spy;
-
-public class MockSingleInputGate {
-
-	protected final SingleInputGate inputGate;
-
-	protected final MockInputChannel[] inputChannels;
-
-	public MockSingleInputGate(int numberOfInputChannels) {
-		this(numberOfInputChannels, true);
-	}
-
-	public MockSingleInputGate(int numberOfInputChannels, boolean initialize) {
-		checkArgument(numberOfInputChannels >= 1);
-
-		this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
-
-		this.inputChannels = new MockInputChannel[numberOfInputChannels];
-
-		if (initialize) {
-			for (int i = 0; i < numberOfInputChannels; i++) {
-				inputChannels[i] = new MockInputChannel(inputGate, i);
-				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
-			}
-		}
-	}
-
-	public MockSingleInputGate read(Buffer buffer, int channelIndex) throws IOException {
-		checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
-
-		inputChannels[channelIndex].read(buffer);
-
-		return this;
-	}
-
-	public MockSingleInputGate readBuffer() throws IOException {
-		return readBuffer(0);
-	}
-
-	public MockSingleInputGate readBuffer(int channelIndex) throws IOException {
-		inputChannels[channelIndex].readBuffer();
-
-		return this;
-	}
-
-	public MockSingleInputGate readEvent() throws IOException {
-		return readEvent(0);
-	}
-
-	public MockSingleInputGate readEvent(int channelIndex) throws IOException {
-		inputChannels[channelIndex].readEvent();
-
-		return this;
-	}
-
-	public MockSingleInputGate readEndOfSuperstepEvent() throws IOException {
-		for (MockInputChannel inputChannel : inputChannels) {
-			inputChannel.readEndOfSuperstepEvent();
-		}
-
-		return this;
-	}
-
-	public MockSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException {
-		inputChannels[channelIndex].readEndOfSuperstepEvent();
-
-		return this;
-	}
-
-	public MockSingleInputGate readEndOfPartitionEvent() throws IOException {
-		for (MockInputChannel inputChannel : inputChannels) {
-			inputChannel.readEndOfPartitionEvent();
-		}
-
-		return this;
-	}
-
-	public MockSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException {
-		inputChannels[channelIndex].readEndOfPartitionEvent();
-
-		return this;
-	}
-
-	public SingleInputGate getInputGate() {
-		return inputGate;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public List<Integer> readAllChannels() throws IOException {
-		final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
-
-		for (int i = 0; i < inputChannels.length; i++) {
-			readOrder.add(i);
-		}
-
-		Collections.shuffle(readOrder);
-
-		for (int channelIndex : readOrder) {
-			inputChannels[channelIndex].readBuffer();
-		}
-
-		return readOrder;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
new file mode 100644
index 0000000..d10bf0c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class TestBufferFactory {
+
+	private static final int defaultSize = 32 * 1024;
+
+	private static final BufferRecycler discardingRecycler = new DiscardingRecycler();
+
+	private final int bufferSize;
+
+	private final BufferRecycler bufferRecycler;
+
+	private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
+
+	public TestBufferFactory() {
+		this(defaultSize, discardingRecycler);
+	}
+
+	public TestBufferFactory(int bufferSize) {
+		this(bufferSize, discardingRecycler);
+	}
+
+	public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) {
+		checkArgument(bufferSize > 0);
+		this.bufferSize = bufferSize;
+		this.bufferRecycler = checkNotNull(bufferRecycler);
+	}
+
+	public Buffer create() {
+		numberOfCreatedBuffers.incrementAndGet();
+
+		return new Buffer(new MemorySegment(new byte[bufferSize]), bufferRecycler);
+	}
+
+	public Buffer createFrom(MemorySegment segment) {
+		return new Buffer(segment, bufferRecycler);
+	}
+
+	public int getNumberOfCreatedBuffers() {
+		return numberOfCreatedBuffers.get();
+	}
+
+	public int getBufferSize() {
+		return bufferSize;
+	}
+
+	// ------------------------------------------------------------------------
+	// Static test helpers
+	// ------------------------------------------------------------------------
+
+	public static Buffer createBuffer() {
+		return createBuffer(defaultSize);
+	}
+
+	public static Buffer createBuffer(int bufferSize) {
+		checkArgument(bufferSize > 0);
+
+		return new Buffer(new MemorySegment(new byte[bufferSize]), discardingRecycler);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
new file mode 100644
index 0000000..52083c4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public interface TestConsumerCallback {
+
+	void onBuffer(Buffer buffer);
+
+	void onEvent(AbstractEvent event);
+
+	public static class CountingCallback implements TestConsumerCallback {
+
+		private final AtomicInteger numberOfReadBuffers = new AtomicInteger();
+
+		private final AtomicInteger numberOfReadEvents = new AtomicInteger();
+
+		@Override
+		public void onBuffer(Buffer buffer) {
+			numberOfReadBuffers.incrementAndGet();
+		}
+
+		@Override
+		public void onEvent(AbstractEvent event) {
+			numberOfReadEvents.incrementAndGet();
+		}
+
+		/**
+		 * Returns the number of read buffers.
+		 */
+		public int getNumberOfReadBuffers() {
+			return numberOfReadBuffers.get();
+		}
+
+		/**
+		 * Returns the number of read events;
+		 */
+		public int getNumberOfReadEvents() {
+			return numberOfReadEvents.get();
+		}
+	}
+
+	public static class RecyclingCallback extends CountingCallback {
+
+		@Override
+		public void onBuffer(Buffer buffer) {
+			super.onBuffer(buffer);
+
+			buffer.recycle();
+		}
+
+		@Override
+		public void onEvent(AbstractEvent event) {
+			super.onEvent(event);
+		}
+	}
+
+	public class VerifyAscendingCallback extends RecyclingCallback {
+
+		@Override
+		public void onBuffer(Buffer buffer) {
+			final MemorySegment segment = buffer.getMemorySegment();
+
+			int expected = getNumberOfReadBuffers() * (segment.size() / 4);
+
+			for (int i = 0; i < segment.size(); i += 4) {
+				assertEquals(expected, segment.getInt(i));
+
+				expected++;
+			}
+
+			super.onBuffer(buffer);
+		}
+
+		@Override
+		public void onEvent(AbstractEvent event) {
+			super.onEvent(event);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
new file mode 100644
index 0000000..976e63d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class TestInfiniteBufferProvider implements BufferProvider {
+
+	private final ConcurrentLinkedQueue<Buffer> buffers = new ConcurrentLinkedQueue<Buffer>();
+
+	private final TestBufferFactory bufferFactory = new TestBufferFactory(
+			32 * 1024, new InfiniteBufferProviderRecycler(buffers));
+
+	@Override
+	public Buffer requestBuffer() throws IOException {
+		Buffer buffer = buffers.poll();
+
+		if (buffer != null) {
+			return buffer;
+		}
+
+		return bufferFactory.create();
+	}
+
+	@Override
+	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+		return requestBuffer();
+	}
+
+	@Override
+	public boolean addListener(EventListener<Buffer> listener) {
+		return false;
+	}
+
+	@Override
+	public boolean isDestroyed() {
+		return false;
+	}
+
+	@Override
+	public int getMemorySegmentSize() {
+		return bufferFactory.getBufferSize();
+	}
+
+	private static class InfiniteBufferProviderRecycler implements BufferRecycler {
+
+		private final ConcurrentLinkedQueue<Buffer> buffers;
+
+		public InfiniteBufferProviderRecycler(ConcurrentLinkedQueue<Buffer> buffers) {
+			this.buffers = buffers;
+		}
+
+		@Override
+		public void recycle(MemorySegment segment) {
+			buffers.add(new Buffer(segment, this));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
new file mode 100644
index 0000000..306de4c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A mocked input channel.
+ */
+public class TestInputChannel {
+
+	private final InputChannel mock = Mockito.mock(InputChannel.class);
+
+	private final SingleInputGate inputGate;
+
+	// Abusing Mockito here... ;)
+	protected OngoingStubbing<Buffer> stubbing;
+
+	public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+		checkArgument(channelIndex >= 0);
+		this.inputGate = checkNotNull(inputGate);
+
+		when(mock.getChannelIndex()).thenReturn(channelIndex);
+	}
+
+	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
+		if (stubbing == null) {
+			stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+		}
+		else {
+			stubbing = stubbing.thenReturn(buffer);
+		}
+
+		inputGate.onAvailableBuffer(mock);
+
+		return this;
+	}
+
+	public TestInputChannel readBuffer() throws IOException, InterruptedException {
+		final Buffer buffer = mock(Buffer.class);
+		when(buffer.isBuffer()).thenReturn(true);
+
+		return read(buffer);
+	}
+
+	public TestInputChannel readEvent() throws IOException, InterruptedException {
+		return read(EventSerializer.toBuffer(new TestTaskEvent()));
+	}
+
+	public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
+		return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
+	}
+
+	public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
+		final Answer<Buffer> answer = new Answer<Buffer>() {
+			@Override
+			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				// Return true after finishing
+				when(mock.isReleased()).thenReturn(true);
+
+				return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+			}
+		};
+
+		if (stubbing == null) {
+			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
+		}
+		else {
+			stubbing = stubbing.thenAnswer(answer);
+		}
+
+		inputGate.onAvailableBuffer(mock);
+
+		return this;
+	}
+
+	public InputChannel getInputChannel() {
+		return mock;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+		checkNotNull(inputGate);
+		checkArgument(numberOfInputChannels > 0);
+
+		TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels];
+
+		for (int i = 0; i < numberOfInputChannels; i++) {
+			mocks[i] = new TestInputChannel(inputGate, i);
+
+			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+		}
+
+		return mocks;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java
new file mode 100644
index 0000000..1e943a6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A mock notification listener.
+ */
+public class TestNotificationListener implements NotificationListener {
+
+	final AtomicInteger numberOfNotifications = new AtomicInteger();
+
+	@Override
+	public void onNotification() {
+		synchronized (numberOfNotifications) {
+			numberOfNotifications.incrementAndGet();
+
+			numberOfNotifications.notifyAll();
+		}
+	}
+
+	/**
+	 * Waits on a notification.
+	 *
+	 * <p> <strong>Important</strong>: It's necessary to get the current number of notifications
+	 * <em>before</em> registering the listener. Otherwise the wait call may block indefinitely.
+	 *
+	 * <pre>
+	 * MockNotificationListener listener = new MockNotificationListener();
+	 *
+	 * int current = listener.getNumberOfNotifications();
+	 *
+	 * // Register the listener
+	 * register(listener);
+	 *
+	 * listener.waitForNotification(current);
+	 * </pre>
+	 */
+	public void waitForNotification(int current) throws InterruptedException {
+		synchronized (numberOfNotifications) {
+			while (current == numberOfNotifications.get()) {
+				numberOfNotifications.wait();
+			}
+		}
+	}
+
+	public int getNumberOfNotifications() {
+		return numberOfNotifications.get();
+	}
+
+	public void reset() {
+		numberOfNotifications.set(0);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
new file mode 100644
index 0000000..31fd4a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A test partition producer.
+ *
+ * <p> The behaviour of the producer is customizable by specifying a source.
+ *
+ * @see TestProducerSource
+ */
+public class TestPartitionProducer implements Callable<Boolean> {
+
+	public static final int MAX_SLEEP_TIME_MS = 20;
+
+	/** The partition to add data to. */
+	private final ResultPartition partition;
+
+	/**
+	 * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
+	 * number of milliseconds between adding data.
+	 */
+	private final boolean isSlowProducer;
+
+	/** The source data. */
+	private final TestProducerSource source;
+
+	/** Random source for sleeps. */
+	private final Random random;
+
+	public TestPartitionProducer(
+			ResultPartition partition,
+			boolean isSlowProducer,
+			TestProducerSource source) {
+
+		this.partition = checkNotNull(partition);
+		this.isSlowProducer = isSlowProducer;
+		this.random = isSlowProducer ? new Random() : null;
+		this.source = checkNotNull(source);
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+
+		try {
+			BufferOrEvent bufferOrEvent;
+
+			while ((bufferOrEvent = source.getNextBufferOrEvent()) != null) {
+				int targetChannelIndex = bufferOrEvent.getChannelIndex();
+
+				if (bufferOrEvent.isBuffer()) {
+					partition.add(bufferOrEvent.getBuffer(), targetChannelIndex);
+				}
+				else if (bufferOrEvent.isEvent()) {
+					final Buffer buffer = EventSerializer.toBuffer(bufferOrEvent.getEvent());
+
+					partition.add(buffer, targetChannelIndex);
+				}
+				else {
+					throw new IllegalStateException("BufferOrEvent instance w/o buffer nor event.");
+				}
+
+				// Check for interrupted flag after adding data to prevent resource leaks
+				if (Thread.interrupted()) {
+					throw new InterruptedException();
+				}
+
+				if (isSlowProducer) {
+					Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
+				}
+			}
+
+			partition.finish();
+
+			return true;
+		}
+		finally {
+			partition.release();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
new file mode 100644
index 0000000..4893360
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import com.google.common.collect.Queues;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class TestPooledBufferProvider implements BufferProvider {
+
+	private final Object bufferCreationLock = new Object();
+
+	private final ArrayBlockingQueue<Buffer> buffers;
+
+	private final TestBufferFactory bufferFactory;
+
+	private final PooledBufferProviderRecycler bufferRecycler;
+
+	private final int poolSize;
+
+	public TestPooledBufferProvider(int poolSize) {
+		checkArgument(poolSize > 0);
+		this.poolSize = poolSize;
+
+		this.buffers = new ArrayBlockingQueue<Buffer>(poolSize);
+		this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
+		this.bufferFactory = new TestBufferFactory(32 * 1024, bufferRecycler);
+	}
+
+	@Override
+	public Buffer requestBuffer() throws IOException {
+		final Buffer buffer = buffers.poll();
+
+		if (buffer != null) {
+			return buffer;
+		}
+		else {
+			synchronized (bufferCreationLock) {
+				if (bufferFactory.getNumberOfCreatedBuffers() < poolSize) {
+					return bufferFactory.create();
+				}
+			}
+
+			return null;
+		}
+	}
+
+	@Override
+	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+		final Buffer buffer = buffers.poll();
+
+		if (buffer != null) {
+			return buffer;
+		}
+		else {
+			synchronized (bufferCreationLock) {
+				if (bufferFactory.getNumberOfCreatedBuffers() < poolSize) {
+					return bufferFactory.create();
+				}
+			}
+
+			return buffers.take();
+		}
+	}
+
+	@Override
+	public boolean addListener(EventListener<Buffer> listener) {
+		return bufferRecycler.registerListener(listener);
+	}
+
+	@Override
+	public boolean isDestroyed() {
+		return false;
+	}
+
+	@Override
+	public int getMemorySegmentSize() {
+		return bufferFactory.getBufferSize();
+	}
+
+	public int getNumberOfAvailableBuffers() {
+		return buffers.size();
+	}
+
+	private static class PooledBufferProviderRecycler implements BufferRecycler {
+
+		private final Object listenerRegistrationLock = new Object();
+
+		private final Queue<Buffer> buffers;
+
+		private final ConcurrentLinkedQueue<EventListener<Buffer>> registeredListeners =
+				Queues.newConcurrentLinkedQueue();
+
+		public PooledBufferProviderRecycler(Queue<Buffer> buffers) {
+			this.buffers = buffers;
+		}
+
+		@Override
+		public void recycle(MemorySegment segment) {
+			synchronized (listenerRegistrationLock) {
+				final Buffer buffer = new Buffer(segment, this);
+
+				EventListener<Buffer> listener = registeredListeners.poll();
+
+				if (listener == null) {
+					buffers.add(buffer);
+				}
+				else {
+					listener.onEvent(buffer);
+				}
+			}
+		}
+
+		boolean registerListener(EventListener<Buffer> listener) {
+			synchronized (listenerRegistrationLock) {
+				if (buffers.isEmpty()) {
+					registeredListeners.add(listener);
+
+					return true;
+				}
+
+				return false;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
new file mode 100644
index 0000000..dea9df2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+public interface TestProducerSource {
+
+	/**
+	 * Returns the next buffer or event instance.
+	 *
+	 * <p> The channel index specifies the subpartition add the data to.
+	 */
+	BufferOrEvent getNextBufferOrEvent() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
new file mode 100644
index 0000000..d10e1a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A test input gate to mock reading data.
+ */
+public class TestSingleInputGate {
+
+	protected final SingleInputGate inputGate;
+
+	protected final TestInputChannel[] inputChannels;
+
+	public TestSingleInputGate(int numberOfInputChannels) {
+		this(numberOfInputChannels, true);
+	}
+
+	public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
+		checkArgument(numberOfInputChannels >= 1);
+
+		this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
+
+		this.inputChannels = new TestInputChannel[numberOfInputChannels];
+
+		if (initialize) {
+			for (int i = 0; i < numberOfInputChannels; i++) {
+				inputChannels[i] = new TestInputChannel(inputGate, i);
+				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+			}
+		}
+	}
+
+	public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
+		checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
+
+		inputChannels[channelIndex].read(buffer);
+
+		return this;
+	}
+
+	public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
+		return readBuffer(0);
+	}
+
+	public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readBuffer();
+
+		return this;
+	}
+
+	public TestSingleInputGate readEvent() throws IOException, InterruptedException {
+		return readEvent(0);
+	}
+
+	public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readEvent();
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
+		for (TestInputChannel inputChannel : inputChannels) {
+			inputChannel.readEndOfSuperstepEvent();
+		}
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readEndOfSuperstepEvent();
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
+		for (TestInputChannel inputChannel : inputChannels) {
+			inputChannel.readEndOfPartitionEvent();
+		}
+
+		return this;
+	}
+
+	public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
+		inputChannels[channelIndex].readEndOfPartitionEvent();
+
+		return this;
+	}
+
+	public SingleInputGate getInputGate() {
+		return inputGate;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public List<Integer> readAllChannels() throws IOException, InterruptedException {
+		final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
+
+		for (int i = 0; i < inputChannels.length; i++) {
+			readOrder.add(i);
+		}
+
+		Collections.shuffle(readOrder);
+
+		for (int channelIndex : readOrder) {
+			inputChannels[channelIndex].readBuffer();
+		}
+
+		return readOrder;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
new file mode 100644
index 0000000..2766e53
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A test subpartition view consumer.
+ *
+ * <p> The behaviour of the consumer is customizable by specifying a callback.
+ *
+ * @see TestConsumerCallback
+ */
+public class TestSubpartitionConsumer implements Callable<Boolean> {
+
+	private static final int MAX_SLEEP_TIME_MS = 20;
+
+	/** The subpartition view to consume. */
+	private final ResultSubpartitionView subpartitionView;
+
+	/**
+	 * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
+	 * number of milliseconds between returned buffers.
+	 */
+	private final boolean isSlowConsumer;
+
+	/** The callback to handle a read buffer. */
+	private final TestConsumerCallback callback;
+
+	/** Random source for sleeps. */
+	private final Random random;
+
+	public TestSubpartitionConsumer(
+			ResultSubpartitionView subpartitionView,
+			boolean isSlowConsumer,
+			TestConsumerCallback callback) {
+
+		this.subpartitionView = checkNotNull(subpartitionView);
+		this.isSlowConsumer = isSlowConsumer;
+		this.random = isSlowConsumer ? new Random() : null;
+		this.callback = checkNotNull(callback);
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+		final TestNotificationListener listener = new TestNotificationListener();
+
+		try {
+			while (true) {
+				if (Thread.interrupted()) {
+					throw new InterruptedException();
+				}
+
+				final Buffer buffer = subpartitionView.getNextBuffer();
+
+				if (isSlowConsumer) {
+					Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
+				}
+
+				if (buffer != null) {
+					if (buffer.isBuffer()) {
+						callback.onBuffer(buffer);
+					}
+					else {
+						final AbstractEvent event = EventSerializer.fromBuffer(buffer,
+								getClass().getClassLoader());
+
+						callback.onEvent(event);
+
+						buffer.recycle();
+
+						if (event.getClass() == EndOfPartitionEvent.class) {
+							subpartitionView.notifySubpartitionConsumed();
+
+							return true;
+						}
+					}
+				}
+				else {
+					int current = listener.getNumberOfNotifications();
+
+					if (subpartitionView.registerListener(listener)) {
+						listener.waitForNotification(current);
+					}
+					else if (subpartitionView.isReleased()) {
+						return true;
+					}
+				}
+			}
+		}
+		finally {
+			subpartitionView.releaseAllResources();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
new file mode 100644
index 0000000..52c156e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A test subpartition producer.
+ *
+ * <p> The behaviour of the producer is customizable by specifying a source.
+ *
+ * @see TestProducerSource
+ */
+public class TestSubpartitionProducer implements Callable<Boolean> {
+
+	public static final int MAX_SLEEP_TIME_MS = 20;
+
+	/** The subpartition to add data to. */
+	private final ResultSubpartition subpartition;
+
+	/**
+	 * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
+	 * number of milliseconds between adding data.
+	 */
+	private final boolean isSlowProducer;
+
+	/** The source data. */
+	private final TestProducerSource source;
+
+	/** Random source for sleeps. */
+	private final Random random;
+
+	public TestSubpartitionProducer(
+			ResultSubpartition subpartition,
+			boolean isSlowProducer,
+			TestProducerSource source) {
+
+		this.subpartition = checkNotNull(subpartition);
+		this.isSlowProducer = isSlowProducer;
+		this.random = isSlowProducer ? new Random() : null;
+		this.source = checkNotNull(source);
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+
+		try {
+			BufferOrEvent bufferOrEvent;
+
+			while ((bufferOrEvent = source.getNextBufferOrEvent()) != null) {
+				if (bufferOrEvent.isBuffer()) {
+					subpartition.add(bufferOrEvent.getBuffer());
+				}
+				else if (bufferOrEvent.isEvent()) {
+					final Buffer buffer = EventSerializer.toBuffer(bufferOrEvent.getEvent());
+
+					subpartition.add(buffer);
+				}
+				else {
+					throw new IllegalStateException("BufferOrEvent instance w/o buffer nor event.");
+				}
+
+				// Check for interrupted flag after adding data to prevent resource leaks
+				if (Thread.interrupted()) {
+					throw new InterruptedException();
+				}
+
+				if (isSlowProducer) {
+					Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
+				}
+			}
+
+			subpartition.finish();
+
+			return true;
+		}
+		finally {
+			subpartition.release();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
index 4f547aa..0b29032 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.event.task.TaskEvent;
 
 import java.io.IOException;
 
+/**
+ * A task event used in various tests.
+ */
 public class TestTaskEvent extends TaskEvent {
 
 	private double val0;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 84fc851..63ce5e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -51,7 +51,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, BufferWriter.class})
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
 public class DataSinkTaskTest extends TaskTestBase
 {
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
@@ -138,7 +138,7 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 
-		IteratorWrappingMockSingleInputGate<?>[] readers = new IteratorWrappingMockSingleInputGate[4];
+		IteratorWrappingTestSingleInputGate<?>[] readers = new IteratorWrappingTestSingleInputGate[4];
 		readers[0] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0, false);
 		readers[1] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0, false);
 		readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
@@ -151,7 +151,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		try {
 			// For the union reader to work, we need to start notifications *after* the union reader
 			// has been initialized.
-			for (IteratorWrappingMockSingleInputGate<?> reader : readers) {
+			for (IteratorWrappingTestSingleInputGate<?> reader : readers) {
 				reader.read();
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 90bb944..4548410 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.junit.Assert;
 
@@ -47,7 +47,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, BufferWriter.class})
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
 public class DataSourceTaskTest extends TaskTestBase {
 
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index e22789c..88a71c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.RegularPactTask;
@@ -48,7 +48,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, BufferWriter.class})
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
 @SuppressWarnings("deprecation")
 public class ChainTaskTest extends TaskTestBase {
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7fb13e3..6625bbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,10 +27,10 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
@@ -71,7 +71,7 @@ public class MockEnvironment implements Environment {
 
 	private final List<InputGate> inputs;
 
-	private final List<BufferWriter> outputs;
+	private final List<ResultPartitionWriter> outputs;
 
 	private final JobID jobID = new JobID();
 
@@ -83,7 +83,7 @@ public class MockEnvironment implements Environment {
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
 		this.inputs = new LinkedList<InputGate>();
-		this.outputs = new LinkedList<BufferWriter>();
+		this.outputs = new LinkedList<ResultPartitionWriter>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
@@ -91,9 +91,9 @@ public class MockEnvironment implements Environment {
 		this.bufferSize = bufferSize;
 	}
 
-	public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
+	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
 		try {
-			final IteratorWrappingMockSingleInputGate<Record> reader = new IteratorWrappingMockSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
+			final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
 
 			inputs.add(reader.getInputGate());
 
@@ -118,7 +118,7 @@ public class MockEnvironment implements Environment {
 				}
 			});
 
-			BufferWriter mockWriter = mock(BufferWriter.class);
+			ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
 			when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
 			when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
 
@@ -225,13 +225,13 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public BufferWriter getWriter(int index) {
+	public ResultPartitionWriter getWriter(int index) {
 		return outputs.get(index);
 	}
 
 	@Override
-	public BufferWriter[] getAllWriters() {
-		return outputs.toArray(new BufferWriter[outputs.size()]);
+	public ResultPartitionWriter[] getAllWriters() {
+		return outputs.toArray(new ResultPartitionWriter[outputs.size()]);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index e0776d9..b93d37e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
@@ -55,14 +55,14 @@ public abstract class TaskTestBase {
 		this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
 	}
 
-	public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
-		final IteratorWrappingMockSingleInputGate<Record> reader = addInput(input, groupId, true);
+	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
+		final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);
 
 		return reader;
 	}
 
-	public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
-		final IteratorWrappingMockSingleInputGate<Record> reader = this.mockEnv.addInput(input);
+	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
+		final IteratorWrappingTestSingleInputGate<Record> reader = this.mockEnv.addInput(input);
 		TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		conf.addInputToGroup(groupId);
 		conf.setInputSerializer(RecordSerializerFactory.get(), groupId);

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index a7173b4..f4ee52f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -196,7 +196,7 @@ public class TaskManagerProcessReapingTest {
 				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
 
 				TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index dbc6e9d..c41e6b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -29,16 +29,18 @@ import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -101,8 +103,8 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
 						new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
-						Collections.<PartitionDeploymentDescriptor>emptyList(),
-						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				new Within(duration("1 seconds")){
@@ -140,14 +142,14 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
-						Collections.<PartitionDeploymentDescriptor>emptyList(),
-						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
-						Collections.<PartitionDeploymentDescriptor>emptyList(),
-						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				final FiniteDuration d = duration("1 second");
@@ -243,14 +245,14 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-						Collections.<PartitionDeploymentDescriptor>emptyList(),
-						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
-						Collections.<PartitionDeploymentDescriptor>emptyList(),
-						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				new Within(duration("1 second")){
@@ -310,24 +312,24 @@ public class TaskManagerTest {
 
 			IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
-			List<PartitionDeploymentDescriptor> irpdd = new ArrayList<PartitionDeploymentDescriptor>();
-			irpdd.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, IntermediateResultPartitionType.PIPELINED, 1));
+			List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+			irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
 
-			PartitionConsumerDeploymentDescriptor ircdd =
-					new PartitionConsumerDeploymentDescriptor(
+			InputGateDeploymentDescriptor ircdd =
+					new InputGateDeploymentDescriptor(
 							new IntermediateDataSetID(),
-							new PartitionInfo[]{
-									new PartitionInfo(partitionId, eid1, PartitionInfo.PartitionLocation.LOCAL, null)
-							},
-							0);
+							0, new InputChannelDeploymentDescriptor[]{
+									new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+							}
+					);
 
 			final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
 					new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-					irpdd, Collections.<PartitionConsumerDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
+					irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
 
 			final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
 					new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
-					Collections.<PartitionDeploymentDescriptor>emptyList(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 					Collections.singletonList(ircdd),
 					new ArrayList<BlobKey>(), 0);
 
@@ -402,25 +404,25 @@ public class TaskManagerTest {
 
 			IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
-			List<PartitionDeploymentDescriptor> irpdd = new ArrayList<PartitionDeploymentDescriptor>();
-			irpdd.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, IntermediateResultPartitionType.PIPELINED, 1));
+			List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+			irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
 
-			PartitionConsumerDeploymentDescriptor ircdd =
-					new PartitionConsumerDeploymentDescriptor(
+			InputGateDeploymentDescriptor ircdd =
+					new InputGateDeploymentDescriptor(
 							new IntermediateDataSetID(),
-							new PartitionInfo[]{
-									new PartitionInfo(partitionId, eid1, PartitionInfo.PartitionLocation.LOCAL, null)
-							},
-							0);
+							0, new InputChannelDeploymentDescriptor[]{
+									new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+							}
+					);
 
 			final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
 					new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-					irpdd, Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+					irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 			final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
 					new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
-					Collections.<PartitionDeploymentDescriptor>emptyList(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 					Collections.singletonList(ircdd),
 					new ArrayList<BlobKey>(), 0);