You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:30 UTC

[flink] 05/10: [hotfix][network] Introduce ResultPartitionFactory

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e546009b7360c341d74b53c5d805e84f6276a897
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 17:33:00 2019 +0200

    [hotfix][network] Introduce ResultPartitionFactory
---
 .../runtime/io/network/NetworkEnvironment.java     |  25 ++--
 .../io/network/partition/ResultPartition.java      |  64 +-------
 .../network/partition/ResultPartitionFactory.java  | 162 +++++++++++++++++++++
 .../network/partition/ResultPartitionBuilder.java  |   9 +-
 4 files changed, 176 insertions(+), 84 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index ea482f1..7974e83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -88,7 +89,7 @@ public class NetworkEnvironment {
 
 	private final TaskEventPublisher taskEventPublisher;
 
-	private final IOManager ioManager;
+	private final ResultPartitionFactory resultPartitionFactory;
 
 	private boolean isShutdown;
 
@@ -98,14 +99,14 @@ public class NetworkEnvironment {
 			ConnectionManager connectionManager,
 			ResultPartitionManager resultPartitionManager,
 			TaskEventPublisher taskEventPublisher,
-			IOManager ioManager) {
+			ResultPartitionFactory resultPartitionFactory) {
 		this.config = config;
 		this.networkBufferPool = networkBufferPool;
 		this.connectionManager = connectionManager;
 		this.resultPartitionManager = resultPartitionManager;
 		this.taskEventPublisher = taskEventPublisher;
-		this.ioManager = ioManager;
 		this.isShutdown = false;
+		this.resultPartitionFactory = resultPartitionFactory;
 	}
 
 	public static NetworkEnvironment create(
@@ -128,6 +129,8 @@ public class NetworkEnvironment {
 		registerNetworkMetrics(metricGroup, networkBufferPool);
 
 		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+		ResultPartitionFactory resultPartitionFactory =
+			new ResultPartitionFactory(resultPartitionManager, ioManager);
 
 		return new NetworkEnvironment(
 			config,
@@ -135,7 +138,7 @@ public class NetworkEnvironment {
 			connectionManager,
 			resultPartitionManager,
 			taskEventPublisher,
-			ioManager);
+			resultPartitionFactory);
 	}
 
 	private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
@@ -283,18 +286,8 @@ public class NetworkEnvironment {
 			ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
 			int counter = 0;
 			for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) {
-				resultPartitions[counter++] = new ResultPartition(
-					taskName,
-					taskActions,
-					jobId,
-					new ResultPartitionID(rpdd.getPartitionId(), executionId),
-					rpdd.getPartitionType(),
-					rpdd.getNumberOfSubpartitions(),
-					rpdd.getMaxParallelism(),
-					resultPartitionManager,
-					partitionConsumableNotifier,
-					ioManager,
-					rpdd.sendScheduleOrUpdateConsumersMessage());
+				resultPartitions[counter++] = resultPartitionFactory.create(
+					taskName, taskActions, jobId, executionId, rpdd, partitionConsumableNotifier);
 			}
 
 			registerOutputMetrics(outputGroup, buffersGroup, resultPartitions);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 1ff1ec5..30d0dd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -32,8 +31,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,17 +122,16 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	private volatile Throwable cause;
 
-	public ResultPartition(
+	ResultPartition(
 		String owningTaskName,
 		TaskActions taskActions, // actions on the owning task
 		JobID jobId,
 		ResultPartitionID partitionId,
 		ResultPartitionType partitionType,
-		int numberOfSubpartitions,
+		ResultSubpartition[] subpartitions,
 		int numTargetKeyGroups,
 		ResultPartitionManager partitionManager,
 		ResultPartitionConsumableNotifier partitionConsumableNotifier,
-		IOManager ioManager,
 		boolean sendScheduleOrUpdateConsumersMessage) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
@@ -143,34 +139,11 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
-		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
+		this.subpartitions = checkNotNull(subpartitions);
 		this.numTargetKeyGroups = numTargetKeyGroups;
 		this.partitionManager = checkNotNull(partitionManager);
 		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
 		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
-
-		// Create the subpartitions.
-		switch (partitionType) {
-			case BLOCKING:
-				initializeBoundedBlockingPartitions(subpartitions, this, ioManager);
-				break;
-
-			case PIPELINED:
-			case PIPELINED_BOUNDED:
-				for (int i = 0; i < subpartitions.length; i++) {
-					subpartitions[i] = new PipelinedSubpartition(i, this);
-				}
-
-				break;
-
-			default:
-				throw new IllegalArgumentException("Unsupported result partition type.");
-		}
-
-		// Initially, partitions should be consumed once before release.
-		pin();
-
-		LOG.debug("{}: Initialized {}", owningTaskName, this);
 	}
 
 	/**
@@ -465,35 +438,4 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 			hasNotifiedPipelinedConsumers = true;
 		}
 	}
-
-	private static void initializeBoundedBlockingPartitions(
-			ResultSubpartition[] subpartitions,
-			ResultPartition parent,
-			IOManager ioManager) {
-
-		int i = 0;
-		try {
-			for (; i < subpartitions.length; i++) {
-				subpartitions[i] = new BoundedBlockingSubpartition(
-						i, parent, ioManager.createChannel().getPathFile().toPath());
-			}
-		}
-		catch (IOException e) {
-			// undo all the work so that a failed constructor does not leave any resources
-			// in need of disposal
-			releasePartitionsQuietly(subpartitions, i);
-
-			// this is not good, we should not be forced to wrap this in a runtime exception.
-			// the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions
-			// is incompatible with eager initialization of resources (RAII).
-			throw new FlinkRuntimeException(e);
-		}
-	}
-
-	private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
-		for (int i = 0; i < until; i++) {
-			final ResultSubpartition subpartition = partitions[i];
-			ExceptionUtils.suppressExceptions(subpartition::release);
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
new file mode 100644
index 0000000..3b9a61a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Factory for {@link ResultPartition} to use in {@link org.apache.flink.runtime.io.network.NetworkEnvironment}.
+ */
+public class ResultPartitionFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
+
+	@Nonnull
+	private final ResultPartitionManager partitionManager;
+
+	@Nonnull
+	private final IOManager ioManager;
+
+	public ResultPartitionFactory(@Nonnull ResultPartitionManager partitionManager,  @Nonnull IOManager ioManager) {
+		this.partitionManager = partitionManager;
+		this.ioManager = ioManager;
+	}
+
+	@VisibleForTesting
+	public ResultPartition create(
+		@Nonnull String taskNameWithSubtaskAndId,
+		@Nonnull TaskActions taskActions,
+		@Nonnull JobID jobId,
+		@Nonnull ExecutionAttemptID executionAttemptID,
+		@Nonnull ResultPartitionDeploymentDescriptor desc,
+		@Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier) {
+
+		return create(
+			taskNameWithSubtaskAndId,
+			taskActions,
+			jobId,
+			new ResultPartitionID(desc.getPartitionId(), executionAttemptID),
+			desc.getPartitionType(),
+			desc.getNumberOfSubpartitions(),
+			desc.getMaxParallelism(),
+			partitionConsumableNotifier,
+			desc.sendScheduleOrUpdateConsumersMessage());
+	}
+
+	public ResultPartition create(
+		@Nonnull String taskNameWithSubtaskAndId,
+		@Nonnull TaskActions taskActions,
+		@Nonnull JobID jobId,
+		@Nonnull ResultPartitionID id,
+		@Nonnull ResultPartitionType type,
+		int numberOfSubpartitions,
+		int maxParallelism,
+		@Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier,
+		boolean sendScheduleOrUpdateConsumersMessage) {
+
+		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
+
+		ResultPartition partition = new ResultPartition(
+			taskNameWithSubtaskAndId,
+			taskActions,
+			jobId,
+			id,
+			type,
+			subpartitions,
+			maxParallelism,
+			partitionManager,
+			partitionConsumableNotifier,
+			sendScheduleOrUpdateConsumersMessage);
+
+		createSubpartitions(partition, type, subpartitions);
+
+		// Initially, partitions should be consumed once before release.
+		partition.pin();
+
+		LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
+
+		return partition;
+	}
+
+	private void createSubpartitions(
+		ResultPartition partition, ResultPartitionType type, ResultSubpartition[] subpartitions) {
+
+		// Create the subpartitions.
+		switch (type) {
+			case BLOCKING:
+				initializeBoundedBlockingPartitions(subpartitions, partition, ioManager);
+				break;
+
+			case PIPELINED:
+			case PIPELINED_BOUNDED:
+				for (int i = 0; i < subpartitions.length; i++) {
+					subpartitions[i] = new PipelinedSubpartition(i, partition);
+				}
+
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unsupported result partition type.");
+		}
+	}
+
+	private static void initializeBoundedBlockingPartitions(
+		ResultSubpartition[] subpartitions,
+		ResultPartition parent,
+		IOManager ioManager) {
+
+		int i = 0;
+		try {
+			for (; i < subpartitions.length; i++) {
+				subpartitions[i] = new BoundedBlockingSubpartition(
+					i, parent, ioManager.createChannel().getPathFile().toPath());
+			}
+		}
+		catch (IOException e) {
+			// undo all the work so that a failed constructor does not leave any resources
+			// in need of disposal
+			releasePartitionsQuietly(subpartitions, i);
+
+			// this is not good, we should not be forced to wrap this in a runtime exception.
+			// the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions
+			// is incompatible with eager initialization of resources (RAII).
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
+		for (int i = 0; i < until; i++) {
+			final ResultSubpartition subpartition = partitions[i];
+			ExceptionUtils.suppressExceptions(subpartition::release);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 3d7dab0..7370d6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -28,9 +28,6 @@ import org.apache.flink.runtime.taskmanager.TaskActions;
  * Utility class to encapsulate the logic of building a {@link ResultPartition} instance.
  */
 public class ResultPartitionBuilder {
-
-	private static final String taskName = "Result Partition";
-
 	private JobID jobId = new JobID();
 
 	private final TaskActions taskActions = new NoOpTaskActions();
@@ -97,17 +94,15 @@ public class ResultPartitionBuilder {
 	}
 
 	public ResultPartition build() {
-		return new ResultPartition(
-			taskName,
+		return new ResultPartitionFactory(partitionManager, ioManager).create(
+			"Result Partition task",
 			taskActions,
 			jobId,
 			partitionId,
 			partitionType,
 			numberOfSubpartitions,
 			numTargetKeyGroups,
-			partitionManager,
 			partitionConsumableNotifier,
-			ioManager,
 			sendScheduleOrUpdateConsumersMessage);
 	}
 }