You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/20 23:00:36 UTC

[flink] branch master updated: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for new scheduler

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


View the commit online:
https://github.com/apache/flink/commit/93dfdd05a84f933473c7b22437e12c03239f9462

The following commit(s) were added to refs/heads/master by this push:
     new 93dfdd0  [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for new scheduler
93dfdd0 is described below

commit 93dfdd05a84f933473c7b22437e12c03239f9462
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Nov 15 14:58:31 2019 +0800

    [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for new scheduler
    
    This avoids the quadratic complexity in the legacy scheduler when checking the availability of the input once
    the successor task becomes ready for scheduling.
---
 .../flink/runtime/executiongraph/Execution.java    |  5 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 46 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3392417..6a1118d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -881,8 +881,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				// at least one of the consumer vertex's inputs is consumable here. This is to avoid the
 				// O(N) complexity introduced by input constraint check for InputDependencyConstraint.ANY,
 				// as we do not want the default scheduling performance to be affected.
-				if (consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY ||
-						consumerVertex.checkInputDependencyConstraints()) {
+				if (isLegacyScheduling() &&
+					(consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY ||
+						consumerVertex.checkInputDependencyConstraints())) {
 					scheduleConsumer(consumerVertex);
 				}
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 46f99f0..a1b5ee3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -31,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
@@ -45,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -72,6 +75,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -88,6 +92,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -497,6 +502,46 @@ public class DefaultSchedulerTest extends TestLogger {
 		assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
 	}
 
+	@Test
+	public void testInputConstraintALLPerf() throws Exception {
+		final int parallelism = 1000;
+		final JobVertex v1 = createVertexWithAllInputConstraints("vertex1", parallelism);
+		final JobVertex v2 = createVertexWithAllInputConstraints("vertex2", parallelism);
+		final JobVertex v3 = createVertexWithAllInputConstraints("vertex3", parallelism);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		v2.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+		final JobGraph jobGraph = new JobGraph(v1, v2, v3);
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+		final AccessExecutionJobVertex ejv1 = scheduler.requestJob().getAllVertices().get(v1.getID());
+
+		for (int i = 0; i < parallelism - 1; i++) {
+			finishSubtask(scheduler, ejv1, i);
+		}
+
+		final long startTime = System.nanoTime();
+		finishSubtask(scheduler, ejv1, parallelism - 1);
+
+		final Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
+		final Duration timeout = Duration.ofSeconds(5);
+
+		assertThat(duration, lessThan(timeout));
+	}
+
+	private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {
+		final JobVertex v = new JobVertex(name);
+		v.setParallelism(parallelism);
+		v.setInvokableClass(AbstractInvokable.class);
+		v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+		return v;
+	}
+
+	private static void finishSubtask(DefaultScheduler scheduler, AccessExecutionJobVertex vertex, int subtask) {
+		final ExecutionAttemptID attemptId = vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+		scheduler.updateTaskExecutionState(
+			new TaskExecutionState(scheduler.getJobGraph().getJobID(), attemptId, ExecutionState.FINISHED));
+	}
+
 	private void acknowledgePendingCheckpoint(final SchedulerBase scheduler, final long checkpointId) throws Exception {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
 
@@ -634,5 +679,4 @@ public class DefaultSchedulerTest extends TestLogger {
 		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		scheduler.startScheduling();
 	}
-
 }