You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/05/02 11:36:22 UTC

flink git commit: [FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler

Repository: flink
Updated Branches:
  refs/heads/master 33695781f -> 464d6f553


[FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler

The buffer provider is not needed and most likely a left over from prior
refactorings.

This closes #3785.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464d6f55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464d6f55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464d6f55

Branch: refs/heads/master
Commit: 464d6f553be79409bb25007255e1306884d09186
Parents: 3369578
Author: Zhijiang <wa...@aliyun.com>
Authored: Wed Apr 26 16:18:54 2017 +0800
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue May 2 13:35:18 2017 +0200

----------------------------------------------------------------------
 .../runtime/io/network/ConnectionManager.java   |  4 +--
 .../io/network/LocalConnectionManager.java      |  5 +---
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../network/netty/NettyConnectionManager.java   |  5 ++--
 .../network/netty/PartitionRequestProtocol.java |  7 ++---
 .../netty/PartitionRequestServerHandler.java    | 19 ++----------
 .../netty/SequenceNumberingViewReader.java      |  5 +---
 .../partition/PipelinedSubpartition.java        |  3 +-
 .../io/network/partition/ResultPartition.java   |  4 +--
 .../partition/ResultPartitionManager.java       |  4 +--
 .../partition/ResultPartitionProvider.java      |  3 --
 .../network/partition/ResultSubpartition.java   |  3 +-
 .../partition/SpillableSubpartition.java        |  7 ++---
 .../partition/consumer/LocalInputChannel.java   |  2 +-
 .../netty/CancelPartitionRequestTest.java       | 13 ++++----
 .../netty/ClientTransportErrorHandlingTest.java | 10 ++-----
 .../netty/NettyConnectionManagerTest.java       |  7 ++---
 .../netty/PartitionRequestQueueTest.java        |  5 +---
 .../netty/ServerTransportErrorHandlingTest.java |  9 ++----
 .../partition/InputChannelTestUtils.java        |  7 ++---
 .../partition/PipelinedSubpartitionTest.java    |  8 ++---
 .../partition/SpillableSubpartitionTest.java    | 31 ++++++++------------
 .../network/partition/SubpartitionTestBase.java |  5 +---
 .../consumer/LocalInputChannelTest.java         |  8 ++---
 .../partition/consumer/SingleInputGateTest.java |  8 ++---
 ...SuccessAfterNetworkBuffersFailureITCase.java |  2 +-
 26 files changed, 62 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 02deb9d..1225230 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
@@ -31,8 +30,7 @@ import java.io.IOException;
 public interface ConnectionManager {
 
 	void start(ResultPartitionProvider partitionProvider,
-				TaskEventDispatcher taskEventDispatcher,
-				NetworkBufferPool networkbufferPool) throws IOException;
+				TaskEventDispatcher taskEventDispatcher) throws IOException;
 
 	/**
 	 * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 4f51a56..bece6a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
@@ -29,9 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 public class LocalConnectionManager implements ConnectionManager {
 
 	@Override
-	public void start(ResultPartitionProvider partitionProvider,
-						TaskEventDispatcher taskEventDispatcher,
-						NetworkBufferPool networkbufferPool) {
+	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4d4b305..cc4cb77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -287,7 +287,7 @@ public class NetworkEnvironment {
 
 			try {
 				LOG.debug("Starting network connection manager");
-				connectionManager.start(resultPartitionManager, taskEventDispatcher, networkBufferPool);
+				connectionManager.start(resultPartitionManager, taskEventDispatcher);
 			}
 			catch (IOException t) {
 				throw new IOException("Failed to instantiate network connection manager.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index abee2a8..fcf618a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
@@ -45,10 +44,10 @@ public class NettyConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool)
+	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher)
 			throws IOException {
 		PartitionRequestProtocol partitionRequestProtocol =
-				new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool);
+				new PartitionRequestProtocol(partitionProvider, taskEventDispatcher);
 
 		client.init(partitionRequestProtocol, bufferPool);
 		server.init(partitionRequestProtocol, bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
index a39f085..d06a018 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.ChannelHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
@@ -34,12 +33,10 @@ class PartitionRequestProtocol implements NettyProtocol {
 
 	private final ResultPartitionProvider partitionProvider;
 	private final TaskEventDispatcher taskEventDispatcher;
-	private final NetworkBufferPool networkbufferPool;
 
-	PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) {
+	PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
 		this.partitionProvider = partitionProvider;
 		this.taskEventDispatcher = taskEventDispatcher;
-		this.networkbufferPool = networkbufferPool;
 	}
 
 	// +-------------------------------------------------------------------+
@@ -77,7 +74,7 @@ class PartitionRequestProtocol implements NettyProtocol {
 	public ChannelHandler[] getServerChannelHandlers() {
 		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
 		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
-				partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool);
+				partitionProvider, taskEventDispatcher, queueOfPartitionQueues);
 
 		return new ChannelHandler[] {
 				messageEncoder,

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 6f56877..1bd05f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -47,36 +45,24 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 
 	private final PartitionRequestQueue outboundQueue;
 
-	private final NetworkBufferPool networkBufferPool;
-
-	private BufferPool bufferPool;
-
 	PartitionRequestServerHandler(
 		ResultPartitionProvider partitionProvider,
 		TaskEventDispatcher taskEventDispatcher,
-		PartitionRequestQueue outboundQueue,
-		NetworkBufferPool networkBufferPool) {
+		PartitionRequestQueue outboundQueue) {
 
 		this.partitionProvider = partitionProvider;
 		this.taskEventDispatcher = taskEventDispatcher;
 		this.outboundQueue = outboundQueue;
-		this.networkBufferPool = networkBufferPool;
 	}
 
 	@Override
 	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
 		super.channelRegistered(ctx);
-
-		bufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 	}
 
 	@Override
 	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
 		super.channelUnregistered(ctx);
-
-		if (bufferPool != null) {
-			bufferPool.lazyDestroy();
-		}
 	}
 
 	@Override
@@ -100,8 +86,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 					reader.requestSubpartitionView(
 						partitionProvider,
 						request.partitionId,
-						request.queueIndex,
-						bufferPool);
+						request.queueIndex);
 				} catch (PartitionNotFoundException notFound) {
 					respondWithError(ctx, notFound, request.receiverId);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 5036bb7..6d95ca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -60,8 +59,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener {
 	void requestSubpartitionView(
 		ResultPartitionProvider partitionProvider,
 		ResultPartitionID resultPartitionId,
-		int subPartitionIndex,
-		BufferProvider bufferProvider) throws IOException {
+		int subPartitionIndex) throws IOException {
 
 		synchronized (requestLock) {
 			if (subpartitionView == null) {
@@ -72,7 +70,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener {
 				this.subpartitionView = partitionProvider.createSubpartitionView(
 					resultPartitionId,
 					subPartitionIndex,
-					bufferProvider,
 					this);
 			} else {
 				throw new IllegalStateException("Subpartition already requested");

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 4fd74e2..ed72b51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,7 +162,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
 		final int queueSize;
 
 		synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index d207f60..9b02e4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -362,7 +362,7 @@ public class ResultPartition implements BufferPoolOwner {
 	/**
 	 * Returns the requested subpartition.
 	 */
-	public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
 		int refCnt = pendingReferences.get();
 
 		checkState(refCnt != -1, "Partition released.");
@@ -370,7 +370,7 @@ public class ResultPartition implements BufferPoolOwner {
 
 		checkElementIndex(index, subpartitions.length, "Subpartition not found.");
 
-		ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener);
+		ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener);
 
 		LOG.debug("Created {}", readView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 8ad3e34..f681548 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -22,7 +22,6 @@ import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Table;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +65,6 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 	public ResultSubpartitionView createSubpartitionView(
 			ResultPartitionID partitionId,
 			int subpartitionIndex,
-			BufferProvider bufferProvider,
 			BufferAvailabilityListener availabilityListener) throws IOException {
 
 		synchronized (registeredPartitions) {
@@ -79,7 +77,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 
 			LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);
 
-			return partition.createSubpartitionView(subpartitionIndex, bufferProvider, availabilityListener);
+			return partition.createSubpartitionView(subpartitionIndex, availabilityListener);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 3fbfd49..db72d63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-
 import java.io.IOException;
 
 public interface ResultPartitionProvider {
@@ -30,7 +28,6 @@ public interface ResultPartitionProvider {
 	ResultSubpartitionView createSubpartitionView(
 			ResultPartitionID partitionId,
 			int index,
-			BufferProvider bufferProvider,
 			BufferAvailabilityListener availabilityListener) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index d3cd887..3b4e3c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 
 import java.io.IOException;
 
@@ -77,7 +76,7 @@ public abstract class ResultSubpartition {
 
 	abstract public void release() throws IOException;
 
-	abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException;
+	abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
 
 	abstract int releaseMemory() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index a578188..11c6d16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -164,7 +163,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+	public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
 		synchronized (buffers) {
 			if (!isFinished) {
 				throw new IllegalStateException("Subpartition has not been finished yet, " +
@@ -180,7 +179,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			if (spillWriter != null) {
 				readView = new SpilledSubpartitionView(
 					this,
-					bufferProvider.getMemorySegmentSize(),
+					parent.getBufferProvider().getMemorySegmentSize(),
 					spillWriter,
 					getTotalNumberOfBuffers(),
 					availabilityListener);
@@ -189,7 +188,7 @@ class SpillableSubpartition extends ResultSubpartition {
 					this,
 					buffers,
 					ioManager,
-					bufferProvider.getMemorySegmentSize(),
+					parent.getBufferProvider().getMemorySegmentSize(),
 					availabilityListener);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4e14e93..3ade2f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -113,7 +113,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
 				try {
 					ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView(
-						partitionId, subpartitionIndex, inputGate.getBufferProvider(), this);
+						partitionId, subpartitionIndex, this);
 
 					if (subpartitionView == null) {
 						throw new IOException("Error requesting subpartition.");

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index a2f866a..8b4fc0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -78,18 +77,18 @@ public class CancelPartitionRequestTest {
 			final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
 
 			// Return infinite subpartition
-			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferAvailabilityListener.class)))
 				.thenAnswer(new Answer<ResultSubpartitionView>() {
 					@Override
 					public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
-						BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+						BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
 						listener.notifyBuffersAvailable(Long.MAX_VALUE);
 						return view;
 					}
 				});
 
 			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
-					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+					partitions, mock(TaskEventDispatcher.class));
 
 			serverAndClient = initServerAndClient(protocol);
 
@@ -129,18 +128,18 @@ public class CancelPartitionRequestTest {
 			final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
 
 			// Return infinite subpartition
-			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferAvailabilityListener.class)))
 					.thenAnswer(new Answer<ResultSubpartitionView>() {
 						@Override
 						public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
-							BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+							BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
 							listener.notifyBuffersAvailable(Long.MAX_VALUE);
 							return view;
 						}
 					});
 
 			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
-					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+					partitions, mock(TaskEventDispatcher.class));
 
 			serverAndClient = initServerAndClient(protocol);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 22e7754..b4fc46f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -29,7 +29,6 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -85,8 +84,7 @@ public class ClientTransportErrorHandlingTest {
 			public ChannelHandler[] getClientChannelHandlers() {
 				return new PartitionRequestProtocol(
 						mock(ResultPartitionProvider.class),
-						mock(TaskEventDispatcher.class),
-						mock(NetworkBufferPool.class)).getClientChannelHandlers();
+						mock(TaskEventDispatcher.class)).getClientChannelHandlers();
 			}
 		};
 
@@ -235,8 +233,7 @@ public class ClientTransportErrorHandlingTest {
 			public ChannelHandler[] getClientChannelHandlers() {
 				return new PartitionRequestProtocol(
 						mock(ResultPartitionProvider.class),
-						mock(TaskEventDispatcher.class),
-						mock(NetworkBufferPool.class)).getClientChannelHandlers();
+						mock(TaskEventDispatcher.class)).getClientChannelHandlers();
 			}
 		};
 
@@ -383,8 +380,7 @@ public class ClientTransportErrorHandlingTest {
 	private EmbeddedChannel createEmbeddedChannel() {
 		PartitionRequestProtocol protocol = new PartitionRequestProtocol(
 				mock(ResultPartitionProvider.class),
-				mock(TaskEventDispatcher.class),
-				mock(NetworkBufferPool.class));
+				mock(TaskEventDispatcher.class));
 
 		return new EmbeddedChannel(protocol.getClientChannelHandlers());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 8ab572e..77de6bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -23,7 +23,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.EventLoopGroup;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.util.NetUtils;
 import org.junit.Test;
@@ -60,8 +59,7 @@ public class NettyConnectionManagerTest {
 
 		connectionManager.start(
 				mock(ResultPartitionProvider.class),
-				mock(TaskEventDispatcher.class),
-				mock(NetworkBufferPool.class));
+				mock(TaskEventDispatcher.class));
 
 		assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
 
@@ -129,8 +127,7 @@ public class NettyConnectionManagerTest {
 
 		connectionManager.start(
 				mock(ResultPartitionProvider.class),
-				mock(TaskEventDispatcher.class),
-				mock(NetworkBufferPool.class));
+				mock(TaskEventDispatcher.class));
 
 		assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 7224e96..b969b1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -43,7 +42,6 @@ public class PartitionRequestQueueTest {
 
 		ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
 		ResultPartitionID rpid = new ResultPartitionID();
-		BufferProvider bufferProvider = mock(BufferProvider.class);
 
 		ResultSubpartitionView view = mock(ResultSubpartitionView.class);
 		when(view.isReleased()).thenReturn(true);
@@ -52,13 +50,12 @@ public class PartitionRequestQueueTest {
 		when(partitionProvider.createSubpartitionView(
 			eq(rpid),
 			eq(0),
-			eq(bufferProvider),
 			any(BufferAvailabilityListener.class))).thenReturn(view);
 
 		EmbeddedChannel ch = new EmbeddedChannel(queue);
 
 		SequenceNumberingViewReader seqView = new SequenceNumberingViewReader(new InputChannelID(), queue);
-		seqView.requestSubpartitionView(partitionProvider, rpid, 0, bufferProvider);
+		seqView.requestSubpartitionView(partitionProvider, rpid, 0);
 
 		// Enqueue the erroneous view
 		queue.notifyReaderNonEmpty(seqView);

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 1c3557e..3c4ebb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -23,8 +23,6 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -63,11 +61,11 @@ public class ServerTransportErrorHandlingTest {
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
 		when(partitionManager
-			.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+			.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)))
 			.thenAnswer(new Answer<ResultSubpartitionView>() {
 				@Override
 				public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
-					BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+					BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
 					listener.notifyBuffersAvailable(Long.MAX_VALUE);
 					return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
 				}
@@ -78,8 +76,7 @@ public class ServerTransportErrorHandlingTest {
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new PartitionRequestProtocol(
 					partitionManager,
-					mock(TaskEventDispatcher.class),
-					mock(NetworkBufferPool.class)).getServerChannelHandlers();
+					mock(TaskEventDispatcher.class)).getServerChannelHandlers();
 			}
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index e292576..a327838 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,14 +59,14 @@ class InputChannelTestUtils {
 
 			@Override
 			public ResultSubpartitionView answer(InvocationOnMock invocation) throws Throwable {
-				BufferAvailabilityListener channel = (BufferAvailabilityListener) invocation.getArguments()[3];
-				return sources[num++].createReadView(null, channel);
+				BufferAvailabilityListener channel = (BufferAvailabilityListener) invocation.getArguments()[2];
+				return sources[num++].createReadView(channel);
 			}
 		};
 
 		ResultPartitionManager manager = mock(ResultPartitionManager.class);
 		when(manager.createSubpartitionView(
-				any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+				any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)))
 				.thenAnswer(viewCreator);
 
 		return manager;

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index e3200d1..de1e8a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -70,14 +70,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		final PipelinedSubpartition subpartition = createSubpartition();
 
 		// Successful request
-		assertNotNull(subpartition.createReadView(null, new BufferAvailabilityListener() {
+		assertNotNull(subpartition.createReadView(new BufferAvailabilityListener() {
 			@Override
 			public void notifyBuffersAvailable(long numBuffers) {
 			}
 		}));
 
 		try {
-			subpartition.createReadView(null, new BufferAvailabilityListener() {
+			subpartition.createReadView(new BufferAvailabilityListener() {
 				@Override
 				public void notifyBuffersAvailable(long numBuffers) {
 				}
@@ -94,7 +94,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
 
-		ResultSubpartitionView view = subpartition.createReadView(null, listener);
+		ResultSubpartitionView view = subpartition.createReadView(listener);
 
 		// Empty => should return null
 		assertNull(view.getNextBuffer());
@@ -221,7 +221,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		final PipelinedSubpartition subpartition = createSubpartition();
 
 		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
-		final PipelinedSubpartitionView view = subpartition.createReadView(null, consumer);
+		final PipelinedSubpartitionView view = subpartition.createReadView(consumer);
 		consumer.setSubpartitionView(view);
 
 		Future<Boolean> producerResult = executorService.submit(

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index b53ef68..2b356a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -67,8 +67,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	}
 
 	@Override
-	ResultSubpartition createSubpartition() {
-		return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager);
+	SpillableSubpartition createSubpartition() {
+		ResultPartition parent = mock(ResultPartition.class);
+		BufferProvider bufferProvider = mock(BufferProvider.class);
+		when(parent.getBufferProvider()).thenReturn(bufferProvider);
+		when(bufferProvider.getMemorySegmentSize()).thenReturn(32 * 1024);
+		return new SpillableSubpartition(0, parent, ioManager);
 	}
 
 	/**
@@ -138,14 +142,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	@Test
 	public void testReleasePartitionAndGetNext() throws Exception {
 		// Create partition and add some buffers
-		SpillableSubpartition partition = new SpillableSubpartition(
-			0, mock(ResultPartition.class), ioManager);
+		SpillableSubpartition partition = createSubpartition();
 
 		partition.finish();
 
 		// Create the read view
 		ResultSubpartitionView readView = spy(partition
-			.createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() {
+			.createReadView(new BufferAvailabilityListener() {
 				@Override
 				public void notifyBuffersAvailable(long numBuffers) {
 
@@ -168,11 +171,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	 */
 	@Test
 	public void testConsumeSpilledPartition() throws Exception {
-		ResultPartition parent = mock(ResultPartition.class);
-		SpillableSubpartition partition = new SpillableSubpartition(
-			0,
-			parent,
-			ioManager);
+		SpillableSubpartition partition = createSubpartition();
 
 		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
 		buffer.retain();
@@ -187,7 +186,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.finish();
 
 		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
@@ -216,11 +215,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	 */
 	@Test
 	public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
-		ResultPartition parent = mock(ResultPartition.class);
-		SpillableSubpartition partition = new SpillableSubpartition(
-			0,
-			parent,
-			ioManager);
+		SpillableSubpartition partition = createSubpartition();
 
 		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
 		buffer.retain();
@@ -232,7 +227,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.finish();
 
 		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+		SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener);
 
 		// Initial notification
 		assertEquals(1, listener.getNumNotifiedBuffers());

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 14942bc..800542e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -91,11 +90,9 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		partition.add(buffer);
 		partition.finish();
 
-		TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
-
 		// Create the view
 		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-		ResultSubpartitionView view = partition.createReadView(buffers, listener);
+		ResultSubpartitionView view = partition.createReadView(listener);
 
 		// The added buffer and end-of-partition event
 		assertNotNull(view.getNextBuffer());

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 18c3038..fe819a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -198,7 +198,7 @@ public class LocalInputChannelTest {
 		LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff);
 
 		when(partitionManager
-				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)))
+				.createSubpartitionView(eq(ch.partitionId), eq(0), any(BufferAvailabilityListener.class)))
 				.thenThrow(new PartitionNotFoundException(ch.partitionId));
 
 		Timer timer = mock(Timer.class);
@@ -214,7 +214,7 @@ public class LocalInputChannelTest {
 		// Initial request
 		ch.requestSubpartition(0);
 		verify(partitionManager)
-				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class));
+				.createSubpartitionView(eq(ch.partitionId), eq(0), any(BufferAvailabilityListener.class));
 
 		// Request subpartition and verify that the actual requests are delayed.
 		for (long expected : expectedDelays) {
@@ -241,7 +241,7 @@ public class LocalInputChannelTest {
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager
-				.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+				.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)))
 				.thenReturn(view);
 
 		SingleInputGate inputGate = mock(SingleInputGate.class);
@@ -296,7 +296,6 @@ public class LocalInputChannelTest {
 			.createSubpartitionView(
 				any(ResultPartitionID.class),
 				anyInt(),
-				any(BufferProvider.class),
 				any(BufferAvailabilityListener.class)))
 			.thenAnswer(new Answer<ResultSubpartitionView>() {
 				@Override
@@ -360,7 +359,6 @@ public class LocalInputChannelTest {
 		when(partitionManager.createSubpartitionView(
 				any(ResultPartitionID.class),
 				anyInt(),
-				any(BufferProvider.class),
 				any(BufferAvailabilityListener.class))).thenReturn(reader);
 
 		LocalInputChannel channel = new LocalInputChannel(

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2d1b4b2..737f17b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -128,7 +127,6 @@ public class SingleInputGateTest {
 		when(partitionManager.createSubpartitionView(
 			any(ResultPartitionID.class),
 			anyInt(),
-			any(BufferProvider.class),
 			any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
 		// Setup reader with one local and one unknown input channel
@@ -163,7 +161,7 @@ public class SingleInputGateTest {
 		inputGate.requestPartitions();
 
 		// Only the local channel can request
-		verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
+		verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
 
 		// Send event backwards and initialize unknown channel afterwards
 		final TaskEvent event = new TestTaskEvent();
@@ -175,7 +173,7 @@ public class SingleInputGateTest {
 		// After the update, the pending event should be send to local channel
 		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
 
-		verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
+		verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
 		verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 	}
 
@@ -215,7 +213,7 @@ public class SingleInputGateTest {
 			ResultPartitionLocation.createLocal()));
 
 		verify(partitionManager, never()).createSubpartitionView(
-			any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
+			any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 9729b09..710e7df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 640);
 			
 			cluster = new LocalFlinkMiniCluster(config, false);