You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:34 UTC

[flink] 09/10: [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e99eee4f0bf9b2d851fa12ea52007e8f083c366f
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 20:56:25 2019 +0200

    [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()
    
    Move partition setup from NetworkEnvironment to ResultPartition.
    This eliminates tie between Task and NetworkEnvironment.
    Task does not need depend on NetworkEnvironment
    and can trigger setup from ResultPartitionWriter interface.
---
 .../runtime/io/network/NetworkEnvironment.java     | 47 +++++-----------
 .../network/api/writer/ResultPartitionWriter.java  |  5 ++
 .../io/network/partition/ResultPartition.java      | 22 +++++---
 .../network/partition/ResultPartitionFactory.java  | 49 +++++++++++++++--
 .../runtime/io/network/NetworkEnvironmentTest.java | 39 ++++----------
 .../AbstractCollectingResultPartitionWriter.java   |  4 ++
 .../io/network/api/writer/RecordWriterTest.java    |  8 +++
 .../io/network/partition/PartitionTestUtils.java   |  4 ++
 .../network/partition/ResultPartitionBuilder.java  | 63 ++++++++++++++++++++--
 .../io/network/partition/ResultPartitionTest.java  |  5 +-
 .../partition/consumer/LocalInputChannelTest.java  |  9 ++--
 .../StreamNetworkBenchmarkEnvironment.java         |  3 +-
 12 files changed, 168 insertions(+), 90 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 459669c..de669bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -57,7 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -135,11 +134,19 @@ public class NetworkEnvironment {
 		registerNetworkMetrics(metricGroup, networkBufferPool);
 
 		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-		ResultPartitionFactory resultPartitionFactory =
-			new ResultPartitionFactory(resultPartitionManager, ioManager);
+		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
+			resultPartitionManager,
+			ioManager,
+			networkBufferPool,
+			config.networkBuffersPerChannel(),
+			config.floatingNetworkBuffersPerGate());
 
 		SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
-			config, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool);
+			config,
+			connectionManager,
+			resultPartitionManager,
+			taskEventPublisher,
+			networkBufferPool);
 
 		return new NetworkEnvironment(
 			config,
@@ -196,7 +203,7 @@ public class NetworkEnvironment {
 			}
 
 			for (final ResultPartition partition : producedPartitions) {
-				setupPartition(partition);
+				partition.setup();
 			}
 
 			// Setup the buffer pool for each buffer reader
@@ -208,36 +215,6 @@ public class NetworkEnvironment {
 	}
 
 	@VisibleForTesting
-	public void setupPartition(ResultPartition partition) throws IOException {
-		BufferPool bufferPool = null;
-
-		try {
-			int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
-				partition.getNumberOfSubpartitions() * config.networkBuffersPerChannel() +
-					config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
-			// If the partition type is back pressure-free, we register with the buffer pool for
-			// callbacks to release memory.
-			bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
-				maxNumberOfMemorySegments,
-				partition.getPartitionType().hasBackPressure() ? Optional.empty() : Optional.of(partition));
-
-			partition.registerBufferPool(bufferPool);
-
-			resultPartitionManager.registerResultPartition(partition);
-		} catch (Throwable t) {
-			if (bufferPool != null) {
-				bufferPool.lazyDestroy();
-			}
-
-			if (t instanceof IOException) {
-				throw (IOException) t;
-			} else {
-				throw new IOException(t.getMessage(), t);
-			}
-		}
-	}
-
-	@VisibleForTesting
 	public void setupInputGate(SingleInputGate gate) throws IOException {
 		BufferPool bufferPool = null;
 		int maxNumberOfMemorySegments;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 661c319..49f74af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -29,6 +29,11 @@ import java.io.IOException;
  */
 public interface ResultPartitionWriter extends AutoCloseable {
 
+	/**
+	 * Setup partition, potentially heavy-weight, blocking operation comparing to just creation.
+	 */
+	void setup() throws IOException;
+
 	BufferProvider getBufferProvider();
 
 	ResultPartitionID getPartitionId();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 30d0dd6..ca3855d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,7 +123,9 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	private volatile Throwable cause;
 
-	ResultPartition(
+	private final FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory;
+
+	public ResultPartition(
 		String owningTaskName,
 		TaskActions taskActions, // actions on the owning task
 		JobID jobId,
@@ -132,7 +135,8 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		int numTargetKeyGroups,
 		ResultPartitionManager partitionManager,
 		ResultPartitionConsumableNotifier partitionConsumableNotifier,
-		boolean sendScheduleOrUpdateConsumersMessage) {
+		boolean sendScheduleOrUpdateConsumersMessage,
+		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.taskActions = checkNotNull(taskActions);
@@ -144,6 +148,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		this.partitionManager = checkNotNull(partitionManager);
 		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
 		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
+		this.bufferPoolFactory = bufferPoolFactory;
 	}
 
 	/**
@@ -154,13 +159,16 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	 * <p>The pool is registered with the partition *after* it as been constructed in order to conform
 	 * to the life-cycle of task registrations in the {@link TaskExecutor}.
 	 */
-	public void registerBufferPool(BufferPool bufferPool) {
-		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
-				"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
-
+	@Override
+	public void setup() throws IOException {
 		checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool.");
 
-		this.bufferPool = checkNotNull(bufferPool);
+		BufferPool bufferPool = checkNotNull(bufferPoolFactory.apply(this));
+		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
+			"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
+
+		this.bufferPool = bufferPool;
+		partitionManager.registerResultPartition(this);
 	}
 
 	public JobID getJobId() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 3b9a61a..247f2e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Factory for {@link ResultPartition} to use in {@link org.apache.flink.runtime.io.network.NetworkEnvironment}.
@@ -46,12 +51,27 @@ public class ResultPartitionFactory {
 	@Nonnull
 	private final IOManager ioManager;
 
-	public ResultPartitionFactory(@Nonnull ResultPartitionManager partitionManager,  @Nonnull IOManager ioManager) {
+	@Nonnull
+	private final BufferPoolFactory bufferPoolFactory;
+
+	private final int networkBuffersPerChannel;
+
+	private final int floatingNetworkBuffersPerGate;
+
+	public ResultPartitionFactory(
+		@Nonnull ResultPartitionManager partitionManager,
+		@Nonnull IOManager ioManager,
+		@Nonnull BufferPoolFactory bufferPoolFactory,
+		int networkBuffersPerChannel,
+		int floatingNetworkBuffersPerGate) {
+
 		this.partitionManager = partitionManager;
 		this.ioManager = ioManager;
+		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+		this.bufferPoolFactory = bufferPoolFactory;
 	}
 
-	@VisibleForTesting
 	public ResultPartition create(
 		@Nonnull String taskNameWithSubtaskAndId,
 		@Nonnull TaskActions taskActions,
@@ -69,9 +89,11 @@ public class ResultPartitionFactory {
 			desc.getNumberOfSubpartitions(),
 			desc.getMaxParallelism(),
 			partitionConsumableNotifier,
-			desc.sendScheduleOrUpdateConsumersMessage());
+			desc.sendScheduleOrUpdateConsumersMessage(),
+			createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
 	}
 
+	@VisibleForTesting
 	public ResultPartition create(
 		@Nonnull String taskNameWithSubtaskAndId,
 		@Nonnull TaskActions taskActions,
@@ -81,7 +103,8 @@ public class ResultPartitionFactory {
 		int numberOfSubpartitions,
 		int maxParallelism,
 		@Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier,
-		boolean sendScheduleOrUpdateConsumersMessage) {
+		boolean sendScheduleOrUpdateConsumersMessage,
+		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 
 		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
 
@@ -95,7 +118,8 @@ public class ResultPartitionFactory {
 			maxParallelism,
 			partitionManager,
 			partitionConsumableNotifier,
-			sendScheduleOrUpdateConsumersMessage);
+			sendScheduleOrUpdateConsumersMessage,
+			bufferPoolFactory);
 
 		createSubpartitions(partition, type, subpartitions);
 
@@ -159,4 +183,19 @@ public class ResultPartitionFactory {
 			ExceptionUtils.suppressExceptions(subpartition::release);
 		}
 	}
+
+	@VisibleForTesting
+	FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory(
+		int numberOfSubpartitions, ResultPartitionType type) {
+
+		return p -> {
+			int maxNumberOfMemorySegments = type.isBounded() ?
+				numberOfSubpartitions * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+			// If the partition type is back pressure-free, we register with the buffer pool for
+			// callbacks to release memory.
+			return bufferPoolFactory.createBufferPool(numberOfSubpartitions,
+				maxNumberOfMemorySegments,
+				type.hasBackPressure() ? Optional.empty() : Optional.of(p));
+		};
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 42c5a96..0fc1af7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -39,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -74,10 +74,10 @@ public class NetworkEnvironmentTest {
 			.build();
 
 		// result partitions
-		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
-		ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
-		ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
+		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
+		ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
@@ -182,10 +182,10 @@ public class NetworkEnvironmentTest {
 		final ConnectionManager connManager = createDummyConnectionManager();
 
 		// result partitions
-		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
-		ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
-		ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 4);
+		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
+		ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
@@ -259,27 +259,6 @@ public class NetworkEnvironmentTest {
 	}
 
 	/**
-	 * Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
-	 * {@link NetworkEnvironment#registerTask(Task)}.
-	 *
-	 * @param partitionType
-	 * 		the produced partition type
-	 * @param channels
-	 * 		the number of output channels
-	 *
-	 * @return instance with minimal data set and some mocks so that it is useful for {@link
-	 * NetworkEnvironment#registerTask(Task)}
-	 */
-	private static ResultPartition createResultPartition(
-			final ResultPartitionType partitionType, final int channels) {
-		return new ResultPartitionBuilder()
-			.setResultPartitionType(partitionType)
-			.setNumberOfSubpartitions(channels)
-			.setNumTargetKeyGroups(channels)
-			.build();
-	}
-
-	/**
 	 * Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
 	 * {@link NetworkEnvironment#registerTask(Task)}.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 5de0d70..fd38ee8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -44,6 +44,10 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
 	}
 
 	@Override
+	public void setup() {
+	}
+
+	@Override
 	public BufferProvider getBufferProvider() {
 		return bufferProvider;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 9b496ba..956c4e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -474,6 +474,10 @@ public class RecordWriterTest {
 		}
 
 		@Override
+		public void setup() {
+		}
+
+		@Override
 		public BufferProvider getBufferProvider() {
 			return bufferProvider;
 		}
@@ -535,6 +539,10 @@ public class RecordWriterTest {
 		}
 
 		@Override
+		public void setup() {
+		}
+
+		@Override
 		public BufferProvider getBufferProvider() {
 			return bufferProvider;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index b52bffe..1b9b895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+
 /**
  * This class should consolidate all mocking logic for ResultPartitions.
  * While using Mockito internally (for now), the use of Mockito should not
@@ -45,9 +47,11 @@ public class PartitionTestUtils {
 	}
 
 	public static ResultPartition createPartition(
+			NetworkEnvironment environment,
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
+			.setupBufferPoolFactoryFromNetworkEnvironment(environment)
 			.setResultPartitionType(partitionType)
 			.setNumberOfSubpartitions(numChannels)
 			.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 7370d6a..0e25368 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -21,8 +21,16 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Utility class to encapsulate the logic of building a {@link ResultPartition} instance.
@@ -48,6 +56,15 @@ public class ResultPartitionBuilder {
 
 	private boolean sendScheduleOrUpdateConsumersMessage = false;
 
+	private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1,1);
+
+	private int networkBuffersPerChannel = 1;
+
+	private int floatingNetworkBuffersPerGate = 1;
+
+	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+	private Optional<FunctionWithException<BufferPoolOwner, BufferPool, IOException>> bufferPoolFactory = Optional.empty();
+
 	public ResultPartitionBuilder setJobId(JobID jobId) {
 		this.jobId = jobId;
 		return this;
@@ -88,13 +105,52 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
-	public ResultPartitionBuilder setSendScheduleOrUpdateConsumersMessage(boolean sendScheduleOrUpdateConsumersMessage) {
+	public ResultPartitionBuilder setSendScheduleOrUpdateConsumersMessage(
+		boolean sendScheduleOrUpdateConsumersMessage) {
+
 		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
 		return this;
 	}
 
+	public ResultPartitionBuilder setupBufferPoolFactoryFromNetworkEnvironment(NetworkEnvironment environment) {
+		return setNetworkBuffersPerChannel(environment.getConfiguration().networkBuffersPerChannel())
+			.setFloatingNetworkBuffersPerGate(environment.getConfiguration().floatingNetworkBuffersPerGate())
+			.setNetworkBufferPool(environment.getNetworkBufferPool());
+	}
+
+	public ResultPartitionBuilder setNetworkBufferPool(NetworkBufferPool networkBufferPool) {
+		this.networkBufferPool = networkBufferPool;
+		return this;
+	}
+
+	private ResultPartitionBuilder setNetworkBuffersPerChannel(int networkBuffersPerChannel) {
+		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		return this;
+	}
+
+	private ResultPartitionBuilder setFloatingNetworkBuffersPerGate(int floatingNetworkBuffersPerGate) {
+		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+		return this;
+	}
+
+	public ResultPartitionBuilder setBufferPoolFactory(
+			FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
+		this.bufferPoolFactory = Optional.of(bufferPoolFactory);
+		return this;
+	}
+
 	public ResultPartition build() {
-		return new ResultPartitionFactory(partitionManager, ioManager).create(
+		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
+			partitionManager,
+			ioManager,
+			networkBufferPool,
+			networkBuffersPerChannel,
+			floatingNetworkBuffersPerGate);
+
+		FunctionWithException<BufferPoolOwner, BufferPool, IOException> factory = bufferPoolFactory.orElseGet(() ->
+			resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, partitionType));
+
+		return resultPartitionFactory.create(
 			"Result Partition task",
 			taskActions,
 			jobId,
@@ -103,6 +159,7 @@ public class ResultPartitionBuilder {
 			numberOfSubpartitions,
 			numTargetKeyGroups,
 			partitionConsumableNotifier,
-			sendScheduleOrUpdateConsumersMessage);
+			sendScheduleOrUpdateConsumersMessage,
+			factory);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 5846d6f..3340fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -215,10 +215,9 @@ public class ResultPartitionTest {
 		final int numAllBuffers = 10;
 		final NetworkEnvironment network = new NetworkEnvironmentBuilder()
 			.setNumNetworkBuffers(numAllBuffers).build();
-		final ResultPartitionConsumableNotifier notifier = new NoOpResultPartitionConsumableNotifier();
-		final ResultPartition resultPartition = createPartition(notifier, resultPartitionType, false);
+		final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1);
 		try {
-			network.setupPartition(resultPartition);
+			resultPartition.setup();
 
 			// take all buffers (more than the minimum required)
 			for (int i = 0; i < numAllBuffers; ++i) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index a6ff087..505f792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -115,11 +115,12 @@ public class LocalInputChannelTest {
 				.setNumTargetKeyGroups(parallelism)
 				.setResultPartitionManager(partitionManager)
 				.setSendScheduleOrUpdateConsumersMessage(true)
+				.setBufferPoolFactory(p ->
+					networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
 				.build();
 
 			// Create a buffer pool for this partition
-			partition.registerBufferPool(
-				networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize));
+			partition.setup();
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
@@ -130,10 +131,6 @@ public class LocalInputChannelTest {
 					partition.getBufferProvider(),
 					numberOfBuffersPerChannel)
 			);
-
-			// Register with the partition manager in order to allow the local input channels to
-			// request their respective partitions.
-			partitionManager.registerResultPartition(partition);
 		}
 
 		// Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 2bed27e..4b542e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -219,9 +219,10 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			.setNumberOfSubpartitions(channels)
 			.setResultPartitionManager(environment.getResultPartitionManager())
 			.setIOManager(ioManager)
+			.setupBufferPoolFactoryFromNetworkEnvironment(environment)
 			.build();
 
-		environment.setupPartition(resultPartition);
+		resultPartition.setup();
 
 		return resultPartition;
 	}