You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/06 13:46:48 UTC

[flink] branch master updated: [FLINK-28392][runtime] DefaultExecutionDeployer avoid retrieving executions from ExecutionGraph#currentExecutions

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5c4e3f519f [FLINK-28392][runtime] DefaultExecutionDeployer avoid retrieving executions from ExecutionGraph#currentExecutions
e5c4e3f519f is described below

commit e5c4e3f519f364b5951e7cac331eb8af48f0ed84
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Wed Jul 6 15:11:45 2022 +0800

    [FLINK-28392][runtime] DefaultExecutionDeployer avoid retrieving executions from ExecutionGraph#currentExecutions
    
    This closes #20178.
---
 .../scheduler/DefaultExecutionDeployer.java        | 88 +++++++++-------------
 .../flink/runtime/scheduler/DefaultScheduler.java  |  1 -
 .../flink/runtime/scheduler/ExecutionDeployer.java |  4 -
 .../scheduler/DefaultExecutionDeployerTest.java    | 25 +++---
 4 files changed, 49 insertions(+), 69 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
index e5b7d18b209..a12d0cba06c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
@@ -39,7 +39,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
@@ -65,8 +64,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
 
     private final Time partitionRegistrationTimeout;
 
-    private final Function<ExecutionAttemptID, Execution> executionRetriever;
-
     private final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc;
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
@@ -77,7 +74,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
             final ExecutionOperations executionOperations,
             final ExecutionVertexVersioner executionVertexVersioner,
             final Time partitionRegistrationTimeout,
-            final Function<ExecutionAttemptID, Execution> executionRetriever,
             final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
             final ComponentMainThreadExecutor mainThreadExecutor) {
 
@@ -86,7 +82,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
         this.executionOperations = checkNotNull(executionOperations);
         this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
         this.partitionRegistrationTimeout = checkNotNull(partitionRegistrationTimeout);
-        this.executionRetriever = checkNotNull(executionRetriever);
         this.allocationReservationFunc = checkNotNull(allocationReservationFunc);
         this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
     }
@@ -103,7 +98,8 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
                 allocateSlotsFor(executionsToDeploy);
 
         final List<ExecutionDeploymentHandle> deploymentHandles =
-                createDeploymentHandles(requiredVersionByVertex, executionSlotAssignments);
+                createDeploymentHandles(
+                        executionsToDeploy, requiredVersionByVertex, executionSlotAssignments);
 
         waitForAllSlotsAndDeploy(deploymentHandles);
     }
@@ -132,22 +128,25 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
     }
 
     private List<ExecutionDeploymentHandle> createDeploymentHandles(
+            final List<Execution> executionsToDeploy,
             final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
             final List<ExecutionSlotAssignment> executionSlotAssignments) {
 
-        return executionSlotAssignments.stream()
-                .map(
-                        executionSlotAssignment -> {
-                            final Execution execution =
-                                    getExecutionOrThrow(
-                                            executionSlotAssignment.getExecutionAttemptId());
-                            final ExecutionVertexID executionVertexId =
-                                    execution.getVertex().getID();
-                            return new ExecutionDeploymentHandle(
-                                    executionSlotAssignment,
-                                    requiredVersionByVertex.get(executionVertexId));
-                        })
-                .collect(Collectors.toList());
+        final List<ExecutionDeploymentHandle> deploymentHandles =
+                new ArrayList<>(executionsToDeploy.size());
+        for (int i = 0; i < executionsToDeploy.size(); i++) {
+            final Execution execution = executionsToDeploy.get(i);
+            final ExecutionSlotAssignment assignment = executionSlotAssignments.get(i);
+            checkState(execution.getAttemptId().equals(assignment.getExecutionAttemptId()));
+
+            final ExecutionVertexID executionVertexId = execution.getVertex().getID();
+            final ExecutionDeploymentHandle deploymentHandle =
+                    new ExecutionDeploymentHandle(
+                            execution, assignment, requiredVersionByVertex.get(executionVertexId));
+            deploymentHandles.add(deploymentHandle);
+        }
+
+        return deploymentHandles;
     }
 
     private void waitForAllSlotsAndDeploy(final List<ExecutionDeploymentHandle> deploymentHandles) {
@@ -169,8 +168,7 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
                                     (ignore, throwable) -> {
                                         if (throwable != null) {
                                             handleTaskDeploymentFailure(
-                                                    deploymentHandle.getExecutionAttemptId(),
-                                                    throwable);
+                                                    deploymentHandle.getExecution(), throwable);
                                         }
                                         return null;
                                     });
@@ -208,11 +206,9 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
         return (logicalSlot, throwable) -> {
             final ExecutionVertexVersion requiredVertexVersion =
                     deploymentHandle.getRequiredVertexVersion();
-            final Optional<Execution> optionalExecution =
-                    getExecution(deploymentHandle.getExecutionAttemptId());
+            final Execution execution = deploymentHandle.getExecution();
 
-            if (!optionalExecution.isPresent()
-                    || optionalExecution.get().getState() != ExecutionState.SCHEDULED
+            if (execution.getState() != ExecutionState.SCHEDULED
                     || executionVertexVersioner.isModified(requiredVertexVersion)) {
                 if (throwable == null) {
                     log.debug(
@@ -231,7 +227,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
                 throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
             }
 
-            final Execution execution = optionalExecution.get();
             if (!execution.tryAssignResource(logicalSlot)) {
                 throw new IllegalStateException(
                         "Could not assign resource "
@@ -277,8 +272,7 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
             // a null logicalSlot means the slot assignment is skipped, in which case
             // the produced partition registration process can be skipped as well
             if (logicalSlot != null) {
-                final Execution execution =
-                        getExecutionOrThrow(deploymentHandle.getExecutionAttemptId());
+                final Execution execution = deploymentHandle.getExecution();
                 final CompletableFuture<Void> partitionRegistrationFuture =
                         execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());
 
@@ -299,11 +293,9 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
         return (ignored, throwable) -> {
             final ExecutionVertexVersion requiredVertexVersion =
                     deploymentHandle.getRequiredVertexVersion();
-            final Optional<Execution> optionalExecution =
-                    getExecution(deploymentHandle.getExecutionAttemptId());
+            final Execution execution = deploymentHandle.getExecution();
 
-            if (!optionalExecution.isPresent()
-                    || optionalExecution.get().getState() != ExecutionState.SCHEDULED
+            if (execution.getState() != ExecutionState.SCHEDULED
                     || executionVertexVersioner.isModified(requiredVertexVersion)) {
                 if (throwable == null) {
                     log.debug(
@@ -314,11 +306,10 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
                 return null;
             }
 
-            final Execution execution = optionalExecution.get();
             if (throwable == null) {
                 deployTaskSafe(execution);
             } else {
-                handleTaskDeploymentFailure(execution.getAttemptId(), throwable);
+                handleTaskDeploymentFailure(execution, throwable);
             }
             return null;
         };
@@ -328,40 +319,37 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
         try {
             executionOperations.deploy(execution);
         } catch (Throwable e) {
-            handleTaskDeploymentFailure(execution.getAttemptId(), e);
+            handleTaskDeploymentFailure(execution, e);
         }
     }
 
-    private void handleTaskDeploymentFailure(
-            final ExecutionAttemptID executionAttemptId, final Throwable error) {
-
-        final Execution execution = getExecutionOrThrow(executionAttemptId);
+    private void handleTaskDeploymentFailure(final Execution execution, final Throwable error) {
         executionOperations.markFailed(execution, error);
     }
 
-    private Execution getExecutionOrThrow(ExecutionAttemptID executionAttemptId) {
-        return getExecution(executionAttemptId).get();
-    }
-
-    private Optional<Execution> getExecution(ExecutionAttemptID executionAttemptId) {
-        return Optional.ofNullable(executionRetriever.apply(executionAttemptId));
-    }
-
     private static class ExecutionDeploymentHandle {
 
+        private final Execution execution;
+
         private final ExecutionSlotAssignment executionSlotAssignment;
 
         private final ExecutionVertexVersion requiredVertexVersion;
 
         ExecutionDeploymentHandle(
-                ExecutionSlotAssignment executionSlotAssignment,
+                final Execution execution,
+                final ExecutionSlotAssignment executionSlotAssignment,
                 final ExecutionVertexVersion requiredVertexVersion) {
+            this.execution = checkNotNull(execution);
             this.executionSlotAssignment = checkNotNull(executionSlotAssignment);
             this.requiredVertexVersion = checkNotNull(requiredVertexVersion);
         }
 
+        Execution getExecution() {
+            return execution;
+        }
+
         ExecutionAttemptID getExecutionAttemptId() {
-            return executionSlotAssignment.getExecutionAttemptId();
+            return execution.getAttemptId();
         }
 
         CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
@@ -383,7 +371,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
                 ExecutionOperations executionOperations,
                 ExecutionVertexVersioner executionVertexVersioner,
                 Time partitionRegistrationTimeout,
-                Function<ExecutionAttemptID, Execution> executionRetriever,
                 BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
                 ComponentMainThreadExecutor mainThreadExecutor) {
             return new DefaultExecutionDeployer(
@@ -392,7 +379,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {
                     executionOperations,
                     executionVertexVersioner,
                     partitionRegistrationTimeout,
-                    executionRetriever,
                     allocationReservationFunc,
                     mainThreadExecutor);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 081cfa26987..6cf1cbe628e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -189,7 +189,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
                         executionOperations,
                         executionVertexVersioner,
                         rpcTimeout,
-                        id -> getExecutionGraph().getRegisteredExecutions().get(id),
                         this::startReserveAllocation,
                         mainThreadExecutor);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
index 4fc979e80ba..adb5ff76c87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.slf4j.Logger;
@@ -31,7 +30,6 @@ import org.slf4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
-import java.util.function.Function;
 
 /** This deployer is responsible for deploying executions. */
 interface ExecutionDeployer {
@@ -63,7 +61,6 @@ interface ExecutionDeployer {
          * @param executionVertexVersioner the versioner which records the versions of execution
          *     vertices.
          * @param partitionRegistrationTimeout timeout of partition registration
-         * @param executionRetriever retriever to get executions
          * @param allocationReservationFunc function to reserve allocations for local recovery
          * @param mainThreadExecutor the main thread executor
          * @return an instantiated {@link ExecutionDeployer}
@@ -74,7 +71,6 @@ interface ExecutionDeployer {
                 final ExecutionOperations executionOperations,
                 final ExecutionVertexVersioner executionVertexVersioner,
                 final Time partitionRegistrationTimeout,
-                final Function<ExecutionAttemptID, Execution> executionRetriever,
                 final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
                 final ComponentMainThreadExecutor mainThreadExecutor);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
index 3525fea6eae..ce86ffd695f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
@@ -108,7 +108,7 @@ class DefaultExecutionDeployerTest {
     void testDeployTasks() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
         deployTasks(executionDeployer, executionGraph);
 
@@ -120,7 +120,7 @@ class DefaultExecutionDeployerTest {
     void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(4);
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
@@ -143,7 +143,7 @@ class DefaultExecutionDeployerTest {
         testExecutionOperations.enableFailDeploy();
 
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
         deployTasks(executionDeployer, executionGraph);
 
         assertThat(testExecutionOperations.getFailedExecutions())
@@ -157,7 +157,7 @@ class DefaultExecutionDeployerTest {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
         deployTasks(executionDeployer, executionGraph);
 
         assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
@@ -175,7 +175,7 @@ class DefaultExecutionDeployerTest {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
         deployTasks(executionDeployer, executionGraph);
 
         final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
@@ -193,7 +193,7 @@ class DefaultExecutionDeployerTest {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
         deployTasks(executionDeployer, executionGraph);
 
         final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
@@ -208,7 +208,7 @@ class DefaultExecutionDeployerTest {
     void testDeployOnlyIfVertexIsCreated() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
         // deploy once to transition the tasks out from CREATED state
         deployTasks(executionDeployer, executionGraph);
@@ -233,7 +233,7 @@ class DefaultExecutionDeployerTest {
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
         deployTasks(executionDeployer, executionGraph);
 
@@ -251,7 +251,7 @@ class DefaultExecutionDeployerTest {
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
         deployTasks(executionDeployer, executionGraph);
 
@@ -267,7 +267,7 @@ class DefaultExecutionDeployerTest {
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
         deployTasks(executionDeployer, executionGraph);
 
@@ -289,7 +289,7 @@ class DefaultExecutionDeployerTest {
 
             final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
             final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
-            final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+            final ExecutionDeployer executionDeployer = createExecutionDeployer();
 
             deployTasks(executionDeployer, executionGraph);
 
@@ -339,7 +339,7 @@ class DefaultExecutionDeployerTest {
         return executionGraph;
     }
 
-    private ExecutionDeployer createExecutionDeployer(ExecutionGraph executionGraph) {
+    private ExecutionDeployer createExecutionDeployer() {
         return new DefaultExecutionDeployer.Factory()
                 .createInstance(
                         LoggerFactory.getLogger(DefaultExecutionDeployer.class),
@@ -347,7 +347,6 @@ class DefaultExecutionDeployerTest {
                         testExecutionOperations,
                         executionVertexVersioner,
                         partitionRegistrationTimeout,
-                        id -> executionGraph.getRegisteredExecutions().get(id),
                         (ignored1, ignored2) -> {},
                         mainThreadExecutor);
     }