You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:34 UTC
[flink] 09/10: [FLINK-12331][network] Refactor
NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e99eee4f0bf9b2d851fa12ea52007e8f083c366f
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 20:56:25 2019 +0200
[FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()
Move partition setup from NetworkEnvironment to ResultPartition.
This eliminates tie between Task and NetworkEnvironment.
Task does not need depend on NetworkEnvironment
and can trigger setup from ResultPartitionWriter interface.
---
.../runtime/io/network/NetworkEnvironment.java | 47 +++++-----------
.../network/api/writer/ResultPartitionWriter.java | 5 ++
.../io/network/partition/ResultPartition.java | 22 +++++---
.../network/partition/ResultPartitionFactory.java | 49 +++++++++++++++--
.../runtime/io/network/NetworkEnvironmentTest.java | 39 ++++----------
.../AbstractCollectingResultPartitionWriter.java | 4 ++
.../io/network/api/writer/RecordWriterTest.java | 8 +++
.../io/network/partition/PartitionTestUtils.java | 4 ++
.../network/partition/ResultPartitionBuilder.java | 63 ++++++++++++++++++++--
.../io/network/partition/ResultPartitionTest.java | 5 +-
.../partition/consumer/LocalInputChannelTest.java | 9 ++--
.../StreamNetworkBenchmarkEnvironment.java | 3 +-
12 files changed, 168 insertions(+), 90 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 459669c..de669bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -57,7 +57,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -135,11 +134,19 @@ public class NetworkEnvironment {
registerNetworkMetrics(metricGroup, networkBufferPool);
ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
- ResultPartitionFactory resultPartitionFactory =
- new ResultPartitionFactory(resultPartitionManager, ioManager);
+ ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
+ resultPartitionManager,
+ ioManager,
+ networkBufferPool,
+ config.networkBuffersPerChannel(),
+ config.floatingNetworkBuffersPerGate());
SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
- config, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool);
+ config,
+ connectionManager,
+ resultPartitionManager,
+ taskEventPublisher,
+ networkBufferPool);
return new NetworkEnvironment(
config,
@@ -196,7 +203,7 @@ public class NetworkEnvironment {
}
for (final ResultPartition partition : producedPartitions) {
- setupPartition(partition);
+ partition.setup();
}
// Setup the buffer pool for each buffer reader
@@ -208,36 +215,6 @@ public class NetworkEnvironment {
}
@VisibleForTesting
- public void setupPartition(ResultPartition partition) throws IOException {
- BufferPool bufferPool = null;
-
- try {
- int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
- partition.getNumberOfSubpartitions() * config.networkBuffersPerChannel() +
- config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
- // If the partition type is back pressure-free, we register with the buffer pool for
- // callbacks to release memory.
- bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
- maxNumberOfMemorySegments,
- partition.getPartitionType().hasBackPressure() ? Optional.empty() : Optional.of(partition));
-
- partition.registerBufferPool(bufferPool);
-
- resultPartitionManager.registerResultPartition(partition);
- } catch (Throwable t) {
- if (bufferPool != null) {
- bufferPool.lazyDestroy();
- }
-
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new IOException(t.getMessage(), t);
- }
- }
- }
-
- @VisibleForTesting
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
int maxNumberOfMemorySegments;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 661c319..49f74af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -29,6 +29,11 @@ import java.io.IOException;
*/
public interface ResultPartitionWriter extends AutoCloseable {
+ /**
+ * Setup partition, potentially heavy-weight, blocking operation comparing to just creation.
+ */
+ void setup() throws IOException;
+
BufferProvider getBufferProvider();
ResultPartitionID getPartitionId();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 30d0dd6..ca3855d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,7 +123,9 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
private volatile Throwable cause;
- ResultPartition(
+ private final FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory;
+
+ public ResultPartition(
String owningTaskName,
TaskActions taskActions, // actions on the owning task
JobID jobId,
@@ -132,7 +135,8 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
- boolean sendScheduleOrUpdateConsumersMessage) {
+ boolean sendScheduleOrUpdateConsumersMessage,
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
this.owningTaskName = checkNotNull(owningTaskName);
this.taskActions = checkNotNull(taskActions);
@@ -144,6 +148,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
+ this.bufferPoolFactory = bufferPoolFactory;
}
/**
@@ -154,13 +159,16 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
* <p>The pool is registered with the partition *after* it as been constructed in order to conform
* to the life-cycle of task registrations in the {@link TaskExecutor}.
*/
- public void registerBufferPool(BufferPool bufferPool) {
- checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
- "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
-
+ @Override
+ public void setup() throws IOException {
checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool.");
- this.bufferPool = checkNotNull(bufferPool);
+ BufferPool bufferPool = checkNotNull(bufferPoolFactory.apply(this));
+ checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
+ "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
+
+ this.bufferPool = bufferPool;
+ partitionManager.registerResultPartition(this);
}
public JobID getJobId() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 3b9a61a..247f2e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +37,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.util.Optional;
/**
* Factory for {@link ResultPartition} to use in {@link org.apache.flink.runtime.io.network.NetworkEnvironment}.
@@ -46,12 +51,27 @@ public class ResultPartitionFactory {
@Nonnull
private final IOManager ioManager;
- public ResultPartitionFactory(@Nonnull ResultPartitionManager partitionManager, @Nonnull IOManager ioManager) {
+ @Nonnull
+ private final BufferPoolFactory bufferPoolFactory;
+
+ private final int networkBuffersPerChannel;
+
+ private final int floatingNetworkBuffersPerGate;
+
+ public ResultPartitionFactory(
+ @Nonnull ResultPartitionManager partitionManager,
+ @Nonnull IOManager ioManager,
+ @Nonnull BufferPoolFactory bufferPoolFactory,
+ int networkBuffersPerChannel,
+ int floatingNetworkBuffersPerGate) {
+
this.partitionManager = partitionManager;
this.ioManager = ioManager;
+ this.networkBuffersPerChannel = networkBuffersPerChannel;
+ this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+ this.bufferPoolFactory = bufferPoolFactory;
}
- @VisibleForTesting
public ResultPartition create(
@Nonnull String taskNameWithSubtaskAndId,
@Nonnull TaskActions taskActions,
@@ -69,9 +89,11 @@ public class ResultPartitionFactory {
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
partitionConsumableNotifier,
- desc.sendScheduleOrUpdateConsumersMessage());
+ desc.sendScheduleOrUpdateConsumersMessage(),
+ createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
}
+ @VisibleForTesting
public ResultPartition create(
@Nonnull String taskNameWithSubtaskAndId,
@Nonnull TaskActions taskActions,
@@ -81,7 +103,8 @@ public class ResultPartitionFactory {
int numberOfSubpartitions,
int maxParallelism,
@Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier,
- boolean sendScheduleOrUpdateConsumersMessage) {
+ boolean sendScheduleOrUpdateConsumersMessage,
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
@@ -95,7 +118,8 @@ public class ResultPartitionFactory {
maxParallelism,
partitionManager,
partitionConsumableNotifier,
- sendScheduleOrUpdateConsumersMessage);
+ sendScheduleOrUpdateConsumersMessage,
+ bufferPoolFactory);
createSubpartitions(partition, type, subpartitions);
@@ -159,4 +183,19 @@ public class ResultPartitionFactory {
ExceptionUtils.suppressExceptions(subpartition::release);
}
}
+
+ @VisibleForTesting
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory(
+ int numberOfSubpartitions, ResultPartitionType type) {
+
+ return p -> {
+ int maxNumberOfMemorySegments = type.isBounded() ?
+ numberOfSubpartitions * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+ // If the partition type is back pressure-free, we register with the buffer pool for
+ // callbacks to release memory.
+ return bufferPoolFactory.createBufferPool(numberOfSubpartitions,
+ maxNumberOfMemorySegments,
+ type.hasBackPressure() ? Optional.empty() : Optional.of(p));
+ };
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 42c5a96..0fc1af7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -39,6 +38,7 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -74,10 +74,10 @@ public class NetworkEnvironmentTest {
.build();
// result partitions
- ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
- ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
- ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
- ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
+ ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
+ ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+ ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+ ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
// input gates
@@ -182,10 +182,10 @@ public class NetworkEnvironmentTest {
final ConnectionManager connManager = createDummyConnectionManager();
// result partitions
- ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
- ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
- ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
- ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 4);
+ ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
+ ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+ ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+ ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
// input gates
@@ -259,27 +259,6 @@ public class NetworkEnvironmentTest {
}
/**
- * Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
- * {@link NetworkEnvironment#registerTask(Task)}.
- *
- * @param partitionType
- * the produced partition type
- * @param channels
- * the number of output channels
- *
- * @return instance with minimal data set and some mocks so that it is useful for {@link
- * NetworkEnvironment#registerTask(Task)}
- */
- private static ResultPartition createResultPartition(
- final ResultPartitionType partitionType, final int channels) {
- return new ResultPartitionBuilder()
- .setResultPartitionType(partitionType)
- .setNumberOfSubpartitions(channels)
- .setNumTargetKeyGroups(channels)
- .build();
- }
-
- /**
* Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
* {@link NetworkEnvironment#registerTask(Task)}.
*
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 5de0d70..fd38ee8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -44,6 +44,10 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
}
@Override
+ public void setup() {
+ }
+
+ @Override
public BufferProvider getBufferProvider() {
return bufferProvider;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 9b496ba..956c4e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -474,6 +474,10 @@ public class RecordWriterTest {
}
@Override
+ public void setup() {
+ }
+
+ @Override
public BufferProvider getBufferProvider() {
return bufferProvider;
}
@@ -535,6 +539,10 @@ public class RecordWriterTest {
}
@Override
+ public void setup() {
+ }
+
+ @Override
public BufferProvider getBufferProvider() {
return bufferProvider;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index b52bffe..1b9b895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+
/**
* This class should consolidate all mocking logic for ResultPartitions.
* While using Mockito internally (for now), the use of Mockito should not
@@ -45,9 +47,11 @@ public class PartitionTestUtils {
}
public static ResultPartition createPartition(
+ NetworkEnvironment environment,
ResultPartitionType partitionType,
int numChannels) {
return new ResultPartitionBuilder()
+ .setupBufferPoolFactoryFromNetworkEnvironment(environment)
.setResultPartitionType(partitionType)
.setNumberOfSubpartitions(numChannels)
.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 7370d6a..0e25368 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -21,8 +21,16 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.Optional;
/**
* Utility class to encapsulate the logic of building a {@link ResultPartition} instance.
@@ -48,6 +56,15 @@ public class ResultPartitionBuilder {
private boolean sendScheduleOrUpdateConsumersMessage = false;
+ private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1,1);
+
+ private int networkBuffersPerChannel = 1;
+
+ private int floatingNetworkBuffersPerGate = 1;
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ private Optional<FunctionWithException<BufferPoolOwner, BufferPool, IOException>> bufferPoolFactory = Optional.empty();
+
public ResultPartitionBuilder setJobId(JobID jobId) {
this.jobId = jobId;
return this;
@@ -88,13 +105,52 @@ public class ResultPartitionBuilder {
return this;
}
- public ResultPartitionBuilder setSendScheduleOrUpdateConsumersMessage(boolean sendScheduleOrUpdateConsumersMessage) {
+ public ResultPartitionBuilder setSendScheduleOrUpdateConsumersMessage(
+ boolean sendScheduleOrUpdateConsumersMessage) {
+
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
return this;
}
+ public ResultPartitionBuilder setupBufferPoolFactoryFromNetworkEnvironment(NetworkEnvironment environment) {
+ return setNetworkBuffersPerChannel(environment.getConfiguration().networkBuffersPerChannel())
+ .setFloatingNetworkBuffersPerGate(environment.getConfiguration().floatingNetworkBuffersPerGate())
+ .setNetworkBufferPool(environment.getNetworkBufferPool());
+ }
+
+ public ResultPartitionBuilder setNetworkBufferPool(NetworkBufferPool networkBufferPool) {
+ this.networkBufferPool = networkBufferPool;
+ return this;
+ }
+
+ private ResultPartitionBuilder setNetworkBuffersPerChannel(int networkBuffersPerChannel) {
+ this.networkBuffersPerChannel = networkBuffersPerChannel;
+ return this;
+ }
+
+ private ResultPartitionBuilder setFloatingNetworkBuffersPerGate(int floatingNetworkBuffersPerGate) {
+ this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+ return this;
+ }
+
+ public ResultPartitionBuilder setBufferPoolFactory(
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
+ this.bufferPoolFactory = Optional.of(bufferPoolFactory);
+ return this;
+ }
+
public ResultPartition build() {
- return new ResultPartitionFactory(partitionManager, ioManager).create(
+ ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
+ partitionManager,
+ ioManager,
+ networkBufferPool,
+ networkBuffersPerChannel,
+ floatingNetworkBuffersPerGate);
+
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> factory = bufferPoolFactory.orElseGet(() ->
+ resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, partitionType));
+
+ return resultPartitionFactory.create(
"Result Partition task",
taskActions,
jobId,
@@ -103,6 +159,7 @@ public class ResultPartitionBuilder {
numberOfSubpartitions,
numTargetKeyGroups,
partitionConsumableNotifier,
- sendScheduleOrUpdateConsumersMessage);
+ sendScheduleOrUpdateConsumersMessage,
+ factory);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 5846d6f..3340fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -215,10 +215,9 @@ public class ResultPartitionTest {
final int numAllBuffers = 10;
final NetworkEnvironment network = new NetworkEnvironmentBuilder()
.setNumNetworkBuffers(numAllBuffers).build();
- final ResultPartitionConsumableNotifier notifier = new NoOpResultPartitionConsumableNotifier();
- final ResultPartition resultPartition = createPartition(notifier, resultPartitionType, false);
+ final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1);
try {
- network.setupPartition(resultPartition);
+ resultPartition.setup();
// take all buffers (more than the minimum required)
for (int i = 0; i < numAllBuffers; ++i) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index a6ff087..505f792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -115,11 +115,12 @@ public class LocalInputChannelTest {
.setNumTargetKeyGroups(parallelism)
.setResultPartitionManager(partitionManager)
.setSendScheduleOrUpdateConsumersMessage(true)
+ .setBufferPoolFactory(p ->
+ networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
.build();
// Create a buffer pool for this partition
- partition.registerBufferPool(
- networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize));
+ partition.setup();
// Create the producer
partitionProducers[i] = new TestPartitionProducer(
@@ -130,10 +131,6 @@ public class LocalInputChannelTest {
partition.getBufferProvider(),
numberOfBuffersPerChannel)
);
-
- // Register with the partition manager in order to allow the local input channels to
- // request their respective partitions.
- partitionManager.registerResultPartition(partition);
}
// Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 2bed27e..4b542e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -219,9 +219,10 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
.setNumberOfSubpartitions(channels)
.setResultPartitionManager(environment.getResultPartitionManager())
.setIOManager(ioManager)
+ .setupBufferPoolFactoryFromNetworkEnvironment(environment)
.build();
- environment.setupPartition(resultPartition);
+ resultPartition.setup();
return resultPartition;
}