You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2023/01/10 14:48:33 UTC
[flink] 01/01: [FLINK-29666][runtime] Let job vertices whose parallelism has already been decided can be initialized earlier.
This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca18dd7c8363afae2bfa70fdc8b90b658cc22d62
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Tue Jan 10 15:48:11 2023 +0800
[FLINK-29666][runtime] Let job vertices whose parallelism has already been decided can be initialized earlier.
If the parallelism is user-specified(decided), the downstream job vertices can be initialized earlier, so that it can be scheduled together with its upstream in hybrid shuffle mode.
This closes #21570
---
.../adaptivebatch/AdaptiveBatchScheduler.java | 64 ++++++++++++++++++----
.../adaptivebatch/AdaptiveBatchSchedulerTest.java | 17 ++++++
2 files changed, 69 insertions(+), 12 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 93d87edd459..5032cc0f77b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTime
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -244,19 +245,40 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
try {
final long createTimestamp = System.currentTimeMillis();
for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
- Optional<List<BlockingResultInfo>> consumedResultsInfo =
- tryGetConsumedResultsInfo(jobVertex);
- if (consumedResultsInfo.isPresent() && !jobVertex.isInitialized()) {
- ParallelismAndInputInfos parallelismAndInputInfos =
- tryDecideParallelismAndInputInfos(jobVertex, consumedResultsInfo.get());
- changeJobVertexParallelism(
- jobVertex, parallelismAndInputInfos.getParallelism());
- getExecutionGraph()
- .initializeJobVertex(
- jobVertex,
- createTimestamp,
- parallelismAndInputInfos.getJobVertexInputInfos());
+ if (jobVertex.isInitialized()) {
+ continue;
+ }
+
+ if (canInitialize(jobVertex)) {
+ // This branch is for: If the parallelism is user-specified(decided), the
+ // downstream job vertices can be initialized earlier, so that it can be
+ // scheduled together with its upstream in hybrid shuffle mode.
+
+ // Note that in current implementation, the decider will not load balance
+ // (evenly distribute data) for job vertices whose parallelism has already been
+ // decided, so we can call the
+ // ExecutionGraph#initializeJobVertex(ExecutionJobVertex, long) to initialize.
+ // TODO: In the future, if we want to load balance for job vertices whose
+ // parallelism has already been decided, we need to refactor the logic here.
+ getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
newlyInitializedJobVertices.add(jobVertex);
+ } else {
+ Optional<List<BlockingResultInfo>> consumedResultsInfo =
+ tryGetConsumedResultsInfo(jobVertex);
+ if (consumedResultsInfo.isPresent()) {
+ ParallelismAndInputInfos parallelismAndInputInfos =
+ tryDecideParallelismAndInputInfos(
+ jobVertex, consumedResultsInfo.get());
+ changeJobVertexParallelism(
+ jobVertex, parallelismAndInputInfos.getParallelism());
+ checkState(canInitialize(jobVertex));
+ getExecutionGraph()
+ .initializeJobVertex(
+ jobVertex,
+ createTimestamp,
+ parallelismAndInputInfos.getJobVertexInputInfos());
+ newlyInitializedJobVertices.add(jobVertex);
+ }
}
}
} catch (JobException ex) {
@@ -348,6 +370,24 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
return Optional.of(consumableResultInfo);
}
+ private boolean canInitialize(final ExecutionJobVertex jobVertex) {
+ if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) {
+ return false;
+ }
+
+ // all the upstream job vertices need to have been initialized
+ for (JobEdge inputEdge : jobVertex.getJobVertex().getInputs()) {
+ final ExecutionJobVertex producerVertex =
+ getExecutionGraph().getJobVertex(inputEdge.getSource().getProducer().getID());
+ checkNotNull(producerVertex);
+ if (!producerVertex.isInitialized()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
private void updateTopology(final List<ExecutionJobVertex> newlyInitializedJobVertices) {
for (ExecutionJobVertex vertex : newlyInitializedJobVertices) {
initializeOperatorCoordinatorsFor(vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index bf74c0c14bf..fec551032cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -257,6 +257,23 @@ class AdaptiveBatchSchedulerTest {
assertThat(sink.getParallelism()).isEqualTo(8);
}
+ @Test
+ void testParallelismDecidedVerticesCanBeInitializedEarlier() throws Exception {
+ final JobVertex source = createJobVertex("source", 8);
+ final JobVertex sink = createJobVertex("sink", 8);
+ sink.connectNewDataSetAsInput(
+ source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ SchedulerBase scheduler =
+ createScheduler(new JobGraph(new JobID(), "test job", source, sink));
+ final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph();
+ final ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
+
+ scheduler.startScheduling();
+ // check sink is not initialized
+ assertThat(sinkExecutionJobVertex.isInitialized()).isTrue();
+ }
+
private BlockingResultInfo getBlockingResultInfo(
AdaptiveBatchScheduler scheduler, JobVertex jobVertex) {
return scheduler.getBlockingResultInfo(