You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2023/01/10 14:48:33 UTC

[flink] 01/01: [FLINK-29666][runtime] Let job vertices whose parallelism has already been decided can be initialized earlier.

This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca18dd7c8363afae2bfa70fdc8b90b658cc22d62
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Tue Jan 10 15:48:11 2023 +0800

    [FLINK-29666][runtime] Let job vertices whose parallelism has already been decided can be initialized earlier.
    
    If the parallelism is user-specified(decided), the downstream job vertices can be initialized earlier, so that it can be scheduled together with its upstream in hybrid shuffle mode.
    
    This closes #21570
---
 .../adaptivebatch/AdaptiveBatchScheduler.java      | 64 ++++++++++++++++++----
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  | 17 ++++++
 2 files changed, 69 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 93d87edd459..5032cc0f77b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTime
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -244,19 +245,40 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
         try {
             final long createTimestamp = System.currentTimeMillis();
             for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
-                Optional<List<BlockingResultInfo>> consumedResultsInfo =
-                        tryGetConsumedResultsInfo(jobVertex);
-                if (consumedResultsInfo.isPresent() && !jobVertex.isInitialized()) {
-                    ParallelismAndInputInfos parallelismAndInputInfos =
-                            tryDecideParallelismAndInputInfos(jobVertex, consumedResultsInfo.get());
-                    changeJobVertexParallelism(
-                            jobVertex, parallelismAndInputInfos.getParallelism());
-                    getExecutionGraph()
-                            .initializeJobVertex(
-                                    jobVertex,
-                                    createTimestamp,
-                                    parallelismAndInputInfos.getJobVertexInputInfos());
+                if (jobVertex.isInitialized()) {
+                    continue;
+                }
+
+                if (canInitialize(jobVertex)) {
+                    // This branch is for: If the parallelism is user-specified(decided), the
+                    // downstream job vertices can be initialized earlier, so that it can be
+                    // scheduled together with its upstream in hybrid shuffle mode.
+
+                    // Note that in current implementation, the decider will not load balance
+                    // (evenly distribute data) for job vertices whose parallelism has already been
+                    // decided, so we can call the
+                    // ExecutionGraph#initializeJobVertex(ExecutionJobVertex, long) to initialize.
+                    // TODO: In the future, if we want to load balance for job vertices whose
+                    // parallelism has already been decided, we need to refactor the logic here.
+                    getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
                     newlyInitializedJobVertices.add(jobVertex);
+                } else {
+                    Optional<List<BlockingResultInfo>> consumedResultsInfo =
+                            tryGetConsumedResultsInfo(jobVertex);
+                    if (consumedResultsInfo.isPresent()) {
+                        ParallelismAndInputInfos parallelismAndInputInfos =
+                                tryDecideParallelismAndInputInfos(
+                                        jobVertex, consumedResultsInfo.get());
+                        changeJobVertexParallelism(
+                                jobVertex, parallelismAndInputInfos.getParallelism());
+                        checkState(canInitialize(jobVertex));
+                        getExecutionGraph()
+                                .initializeJobVertex(
+                                        jobVertex,
+                                        createTimestamp,
+                                        parallelismAndInputInfos.getJobVertexInputInfos());
+                        newlyInitializedJobVertices.add(jobVertex);
+                    }
                 }
             }
         } catch (JobException ex) {
@@ -348,6 +370,24 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
         return Optional.of(consumableResultInfo);
     }
 
+    private boolean canInitialize(final ExecutionJobVertex jobVertex) {
+        if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) {
+            return false;
+        }
+
+        // all the upstream job vertices need to have been initialized
+        for (JobEdge inputEdge : jobVertex.getJobVertex().getInputs()) {
+            final ExecutionJobVertex producerVertex =
+                    getExecutionGraph().getJobVertex(inputEdge.getSource().getProducer().getID());
+            checkNotNull(producerVertex);
+            if (!producerVertex.isInitialized()) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     private void updateTopology(final List<ExecutionJobVertex> newlyInitializedJobVertices) {
         for (ExecutionJobVertex vertex : newlyInitializedJobVertices) {
             initializeOperatorCoordinatorsFor(vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index bf74c0c14bf..fec551032cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -257,6 +257,23 @@ class AdaptiveBatchSchedulerTest {
         assertThat(sink.getParallelism()).isEqualTo(8);
     }
 
+    @Test
+    void testParallelismDecidedVerticesCanBeInitializedEarlier() throws Exception {
+        final JobVertex source = createJobVertex("source", 8);
+        final JobVertex sink = createJobVertex("sink", 8);
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+        SchedulerBase scheduler =
+                createScheduler(new JobGraph(new JobID(), "test job", source, sink));
+        final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph();
+        final ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
+
+        scheduler.startScheduling();
+        // check sink is not initialized
+        assertThat(sinkExecutionJobVertex.isInitialized()).isTrue();
+    }
+
     private BlockingResultInfo getBlockingResultInfo(
             AdaptiveBatchScheduler scheduler, JobVertex jobVertex) {
         return scheduler.getBlockingResultInfo(