You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/01 13:31:41 UTC

[flink] 02/04: [FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to accept an execution to deploy

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e73225e4071fb7ffe7c4d5cbfd89983129fe3312
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 23 20:41:01 2022 +0800

    [FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to accept an execution to deploy
    
    This helps to decouple the task deployment from ExecutionVertex#getCurrentExecutionAttempt().
---
 .../runtime/deployment/TaskDeploymentDescriptorFactory.java    | 10 +++++-----
 .../org/apache/flink/runtime/executiongraph/Execution.java     |  4 ++--
 .../runtime/scheduler/adaptivebatch/BlockingResultInfo.java    |  3 +--
 .../deployment/TaskDeploymentDescriptorFactoryTest.java        |  2 +-
 4 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index 6da8f4fabca..21f5ef6f069 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -253,10 +253,10 @@ public class TaskDeploymentDescriptorFactory {
         }
     }
 
-    public static TaskDeploymentDescriptorFactory fromExecutionVertex(
-            ExecutionVertex executionVertex)
+    public static TaskDeploymentDescriptorFactory fromExecution(Execution execution)
             throws IOException, CachedIntermediateDataSetCorruptedException {
-        InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+        final ExecutionVertex executionVertex = execution.getVertex();
+        final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
         Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors;
         try {
@@ -272,7 +272,7 @@ public class TaskDeploymentDescriptorFactory {
         }
 
         return new TaskDeploymentDescriptorFactory(
-                executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                execution.getAttemptId(),
                 getSerializedJobInformation(internalExecutionGraphAccessor),
                 getSerializedTaskInformation(
                         executionVertex.getJobVertex().getTaskInformationOrBlobKey()),
@@ -338,7 +338,7 @@ public class TaskDeploymentDescriptorFactory {
     public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
             IntermediateResultPartition consumedPartition,
             PartitionLocationConstraint partitionDeploymentConstraint) {
-        Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
+        Execution producer = consumedPartition.getProducer().getPartitionProducer();
 
         ExecutionState producerState = producer.getState();
         Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 459d9861980..96c0d3c119a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -564,13 +564,13 @@ public class Execution
                     "Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}",
                     vertex.getTaskNameWithSubtaskIndex(),
                     getAttemptNumber(),
-                    vertex.getCurrentExecutionAttempt().getAttemptId(),
+                    attemptId,
                     vertex.getID(),
                     getAssignedResourceLocation(),
                     slot.getAllocationId());
 
             final TaskDeploymentDescriptor deployment =
-                    TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex)
+                    TaskDeploymentDescriptorFactory.fromExecution(this)
                             .createDeploymentDescriptor(
                                     slot.getAllocationId(),
                                     taskRestore,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
index 11a45a6d7a8..302980ab216 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
@@ -68,8 +68,7 @@ public class BlockingResultInfo {
         for (IntermediateResultPartition partition : intermediateResult.getPartitions()) {
             checkState(partition.isConsumable());
 
-            IOMetrics ioMetrics =
-                    partition.getProducer().getCurrentExecutionAttempt().getIOMetrics();
+            IOMetrics ioMetrics = partition.getProducer().getPartitionProducer().getIOMetrics();
             checkNotNull(ioMetrics, "IOMetrics should not be null.");
 
             blockingPartitionSizes.add(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 7dba9971c56..1074da2d340 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -164,7 +164,7 @@ public class TaskDeploymentDescriptorFactoryTest extends TestLogger {
     private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev)
             throws IOException, CachedIntermediateDataSetCorruptedException {
 
-        return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev)
+        return TaskDeploymentDescriptorFactory.fromExecution(ev.getCurrentExecutionAttempt())
                 .createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList());
     }