You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:30 UTC
[flink] 05/10: [hotfix][network] Introduce ResultPartitionFactory
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e546009b7360c341d74b53c5d805e84f6276a897
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 17:33:00 2019 +0200
[hotfix][network] Introduce ResultPartitionFactory
---
.../runtime/io/network/NetworkEnvironment.java | 25 ++--
.../io/network/partition/ResultPartition.java | 64 +-------
.../network/partition/ResultPartitionFactory.java | 162 +++++++++++++++++++++
.../network/partition/ResultPartitionBuilder.java | 9 +-
4 files changed, 176 insertions(+), 84 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index ea482f1..7974e83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -88,7 +89,7 @@ public class NetworkEnvironment {
private final TaskEventPublisher taskEventPublisher;
- private final IOManager ioManager;
+ private final ResultPartitionFactory resultPartitionFactory;
private boolean isShutdown;
@@ -98,14 +99,14 @@ public class NetworkEnvironment {
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
TaskEventPublisher taskEventPublisher,
- IOManager ioManager) {
+ ResultPartitionFactory resultPartitionFactory) {
this.config = config;
this.networkBufferPool = networkBufferPool;
this.connectionManager = connectionManager;
this.resultPartitionManager = resultPartitionManager;
this.taskEventPublisher = taskEventPublisher;
- this.ioManager = ioManager;
this.isShutdown = false;
+ this.resultPartitionFactory = resultPartitionFactory;
}
public static NetworkEnvironment create(
@@ -128,6 +129,8 @@ public class NetworkEnvironment {
registerNetworkMetrics(metricGroup, networkBufferPool);
ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+ ResultPartitionFactory resultPartitionFactory =
+ new ResultPartitionFactory(resultPartitionManager, ioManager);
return new NetworkEnvironment(
config,
@@ -135,7 +138,7 @@ public class NetworkEnvironment {
connectionManager,
resultPartitionManager,
taskEventPublisher,
- ioManager);
+ resultPartitionFactory);
}
private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
@@ -283,18 +286,8 @@ public class NetworkEnvironment {
ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) {
- resultPartitions[counter++] = new ResultPartition(
- taskName,
- taskActions,
- jobId,
- new ResultPartitionID(rpdd.getPartitionId(), executionId),
- rpdd.getPartitionType(),
- rpdd.getNumberOfSubpartitions(),
- rpdd.getMaxParallelism(),
- resultPartitionManager,
- partitionConsumableNotifier,
- ioManager,
- rpdd.sendScheduleOrUpdateConsumersMessage());
+ resultPartitions[counter++] = resultPartitionFactory.create(
+ taskName, taskActions, jobId, executionId, rpdd, partitionConsumableNotifier);
}
registerOutputMetrics(outputGroup, buffersGroup, resultPartitions);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 1ff1ec5..30d0dd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -32,8 +31,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,17 +122,16 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
private volatile Throwable cause;
- public ResultPartition(
+ ResultPartition(
String owningTaskName,
TaskActions taskActions, // actions on the owning task
JobID jobId,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
- int numberOfSubpartitions,
+ ResultSubpartition[] subpartitions,
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
- IOManager ioManager,
boolean sendScheduleOrUpdateConsumersMessage) {
this.owningTaskName = checkNotNull(owningTaskName);
@@ -143,34 +139,11 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
this.jobId = checkNotNull(jobId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
- this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
+ this.subpartitions = checkNotNull(subpartitions);
this.numTargetKeyGroups = numTargetKeyGroups;
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
-
- // Create the subpartitions.
- switch (partitionType) {
- case BLOCKING:
- initializeBoundedBlockingPartitions(subpartitions, this, ioManager);
- break;
-
- case PIPELINED:
- case PIPELINED_BOUNDED:
- for (int i = 0; i < subpartitions.length; i++) {
- subpartitions[i] = new PipelinedSubpartition(i, this);
- }
-
- break;
-
- default:
- throw new IllegalArgumentException("Unsupported result partition type.");
- }
-
- // Initially, partitions should be consumed once before release.
- pin();
-
- LOG.debug("{}: Initialized {}", owningTaskName, this);
}
/**
@@ -465,35 +438,4 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
hasNotifiedPipelinedConsumers = true;
}
}
-
- private static void initializeBoundedBlockingPartitions(
- ResultSubpartition[] subpartitions,
- ResultPartition parent,
- IOManager ioManager) {
-
- int i = 0;
- try {
- for (; i < subpartitions.length; i++) {
- subpartitions[i] = new BoundedBlockingSubpartition(
- i, parent, ioManager.createChannel().getPathFile().toPath());
- }
- }
- catch (IOException e) {
- // undo all the work so that a failed constructor does not leave any resources
- // in need of disposal
- releasePartitionsQuietly(subpartitions, i);
-
- // this is not good, we should not be forced to wrap this in a runtime exception.
- // the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions
- // is incompatible with eager initialization of resources (RAII).
- throw new FlinkRuntimeException(e);
- }
- }
-
- private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
- for (int i = 0; i < until; i++) {
- final ResultSubpartition subpartition = partitions[i];
- ExceptionUtils.suppressExceptions(subpartition::release);
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
new file mode 100644
index 0000000..3b9a61a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Factory for {@link ResultPartition} to use in {@link org.apache.flink.runtime.io.network.NetworkEnvironment}.
+ */
+public class ResultPartitionFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
+
+ @Nonnull
+ private final ResultPartitionManager partitionManager;
+
+ @Nonnull
+ private final IOManager ioManager;
+
+ public ResultPartitionFactory(@Nonnull ResultPartitionManager partitionManager, @Nonnull IOManager ioManager) {
+ this.partitionManager = partitionManager;
+ this.ioManager = ioManager;
+ }
+
+ @VisibleForTesting
+ public ResultPartition create(
+ @Nonnull String taskNameWithSubtaskAndId,
+ @Nonnull TaskActions taskActions,
+ @Nonnull JobID jobId,
+ @Nonnull ExecutionAttemptID executionAttemptID,
+ @Nonnull ResultPartitionDeploymentDescriptor desc,
+ @Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier) {
+
+ return create(
+ taskNameWithSubtaskAndId,
+ taskActions,
+ jobId,
+ new ResultPartitionID(desc.getPartitionId(), executionAttemptID),
+ desc.getPartitionType(),
+ desc.getNumberOfSubpartitions(),
+ desc.getMaxParallelism(),
+ partitionConsumableNotifier,
+ desc.sendScheduleOrUpdateConsumersMessage());
+ }
+
+ public ResultPartition create(
+ @Nonnull String taskNameWithSubtaskAndId,
+ @Nonnull TaskActions taskActions,
+ @Nonnull JobID jobId,
+ @Nonnull ResultPartitionID id,
+ @Nonnull ResultPartitionType type,
+ int numberOfSubpartitions,
+ int maxParallelism,
+ @Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier,
+ boolean sendScheduleOrUpdateConsumersMessage) {
+
+ ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
+
+ ResultPartition partition = new ResultPartition(
+ taskNameWithSubtaskAndId,
+ taskActions,
+ jobId,
+ id,
+ type,
+ subpartitions,
+ maxParallelism,
+ partitionManager,
+ partitionConsumableNotifier,
+ sendScheduleOrUpdateConsumersMessage);
+
+ createSubpartitions(partition, type, subpartitions);
+
+ // Initially, partitions should be consumed once before release.
+ partition.pin();
+
+ LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
+
+ return partition;
+ }
+
+ private void createSubpartitions(
+ ResultPartition partition, ResultPartitionType type, ResultSubpartition[] subpartitions) {
+
+ // Create the subpartitions.
+ switch (type) {
+ case BLOCKING:
+ initializeBoundedBlockingPartitions(subpartitions, partition, ioManager);
+ break;
+
+ case PIPELINED:
+ case PIPELINED_BOUNDED:
+ for (int i = 0; i < subpartitions.length; i++) {
+ subpartitions[i] = new PipelinedSubpartition(i, partition);
+ }
+
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unsupported result partition type.");
+ }
+ }
+
+ private static void initializeBoundedBlockingPartitions(
+ ResultSubpartition[] subpartitions,
+ ResultPartition parent,
+ IOManager ioManager) {
+
+ int i = 0;
+ try {
+ for (; i < subpartitions.length; i++) {
+ subpartitions[i] = new BoundedBlockingSubpartition(
+ i, parent, ioManager.createChannel().getPathFile().toPath());
+ }
+ }
+ catch (IOException e) {
+ // undo all the work so that a failed constructor does not leave any resources
+ // in need of disposal
+ releasePartitionsQuietly(subpartitions, i);
+
+ // this is not good, we should not be forced to wrap this in a runtime exception.
+ // the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions
+ // is incompatible with eager initialization of resources (RAII).
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
+ for (int i = 0; i < until; i++) {
+ final ResultSubpartition subpartition = partitions[i];
+ ExceptionUtils.suppressExceptions(subpartition::release);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 3d7dab0..7370d6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -28,9 +28,6 @@ import org.apache.flink.runtime.taskmanager.TaskActions;
* Utility class to encapsulate the logic of building a {@link ResultPartition} instance.
*/
public class ResultPartitionBuilder {
-
- private static final String taskName = "Result Partition";
-
private JobID jobId = new JobID();
private final TaskActions taskActions = new NoOpTaskActions();
@@ -97,17 +94,15 @@ public class ResultPartitionBuilder {
}
public ResultPartition build() {
- return new ResultPartition(
- taskName,
+ return new ResultPartitionFactory(partitionManager, ioManager).create(
+ "Result Partition task",
taskActions,
jobId,
partitionId,
partitionType,
numberOfSubpartitions,
numTargetKeyGroups,
- partitionManager,
partitionConsumableNotifier,
- ioManager,
sendScheduleOrUpdateConsumersMessage);
}
}