You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/01 13:31:41 UTC
[flink] 02/04: [FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to accept an execution to deploy
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e73225e4071fb7ffe7c4d5cbfd89983129fe3312
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 23 20:41:01 2022 +0800
[FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to accept an execution to deploy
This helps to decouple the task deployment from ExecutionVertex#getCurrentExecutionAttempt().
---
.../runtime/deployment/TaskDeploymentDescriptorFactory.java | 10 +++++-----
.../org/apache/flink/runtime/executiongraph/Execution.java | 4 ++--
.../runtime/scheduler/adaptivebatch/BlockingResultInfo.java | 3 +--
.../deployment/TaskDeploymentDescriptorFactoryTest.java | 2 +-
4 files changed, 9 insertions(+), 10 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index 6da8f4fabca..21f5ef6f069 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -253,10 +253,10 @@ public class TaskDeploymentDescriptorFactory {
}
}
- public static TaskDeploymentDescriptorFactory fromExecutionVertex(
- ExecutionVertex executionVertex)
+ public static TaskDeploymentDescriptorFactory fromExecution(Execution execution)
throws IOException, CachedIntermediateDataSetCorruptedException {
- InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+ final ExecutionVertex executionVertex = execution.getVertex();
+ final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors;
try {
@@ -272,7 +272,7 @@ public class TaskDeploymentDescriptorFactory {
}
return new TaskDeploymentDescriptorFactory(
- executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ execution.getAttemptId(),
getSerializedJobInformation(internalExecutionGraphAccessor),
getSerializedTaskInformation(
executionVertex.getJobVertex().getTaskInformationOrBlobKey()),
@@ -338,7 +338,7 @@ public class TaskDeploymentDescriptorFactory {
public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
IntermediateResultPartition consumedPartition,
PartitionLocationConstraint partitionDeploymentConstraint) {
- Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
+ Execution producer = consumedPartition.getProducer().getPartitionProducer();
ExecutionState producerState = producer.getState();
Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 459d9861980..96c0d3c119a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -564,13 +564,13 @@ public class Execution
"Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}",
vertex.getTaskNameWithSubtaskIndex(),
getAttemptNumber(),
- vertex.getCurrentExecutionAttempt().getAttemptId(),
+ attemptId,
vertex.getID(),
getAssignedResourceLocation(),
slot.getAllocationId());
final TaskDeploymentDescriptor deployment =
- TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex)
+ TaskDeploymentDescriptorFactory.fromExecution(this)
.createDeploymentDescriptor(
slot.getAllocationId(),
taskRestore,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
index 11a45a6d7a8..302980ab216 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
@@ -68,8 +68,7 @@ public class BlockingResultInfo {
for (IntermediateResultPartition partition : intermediateResult.getPartitions()) {
checkState(partition.isConsumable());
- IOMetrics ioMetrics =
- partition.getProducer().getCurrentExecutionAttempt().getIOMetrics();
+ IOMetrics ioMetrics = partition.getProducer().getPartitionProducer().getIOMetrics();
checkNotNull(ioMetrics, "IOMetrics should not be null.");
blockingPartitionSizes.add(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 7dba9971c56..1074da2d340 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -164,7 +164,7 @@ public class TaskDeploymentDescriptorFactoryTest extends TestLogger {
private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev)
throws IOException, CachedIntermediateDataSetCorruptedException {
- return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev)
+ return TaskDeploymentDescriptorFactory.fromExecution(ev.getCurrentExecutionAttempt())
.createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList());
}