You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 13:05:51 UTC

[flink] 01/02: [FLINK-12228][runtime] Implement Eager Scheduling Strategy

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6fc678297a29e16b73ff8d5159214d5afec0ef2
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Fri Apr 26 13:54:06 2019 +0800

    [FLINK-12228][runtime] Implement Eager Scheduling Strategy
    
    This closes #8296.
---
 .../scheduler/ExecutionVertexDeploymentOption.java |  14 ++-
 .../strategy/EagerSchedulingStrategy.java          | 112 +++++++++++++++++++
 .../scheduler/strategy/SchedulingStrategy.java     |   4 +-
 .../strategy/EagerSchedulingStrategyTest.java      | 121 +++++++++++++++++++++
 .../strategy/TestingSchedulerOperations.java       |  44 ++++++++
 .../strategy/TestingSchedulingExecutionVertex.java |  57 ++++++++++
 .../strategy/TestingSchedulingTopology.java        |  65 +++++++++++
 7 files changed, 413 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
index 829f6ba..9831a9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Component that stores the task need to be scheduled and the option for deployment.
  */
@@ -30,7 +32,15 @@ public class ExecutionVertexDeploymentOption {
 	private final DeploymentOption deploymentOption;
 
 	public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) {
-		this.executionVertexId = executionVertexId;
-		this.deploymentOption = deploymentOption;
+		this.executionVertexId = checkNotNull(executionVertexId);
+		this.deploymentOption = checkNotNull(deploymentOption);
+	}
+
+	public ExecutionVertexID getExecutionVertexId() {
+		return executionVertexId;
+	}
+
+	public DeploymentOption getDeploymentOption() {
+		return deploymentOption;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
new file mode 100644
index 0000000..954fff5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+	private final SchedulerOperations schedulerOperations;
+
+	private final SchedulingTopology schedulingTopology;
+
+	private final DeploymentOption deploymentOption = new DeploymentOption(false);
+
+	public EagerSchedulingStrategy(
+			SchedulerOperations schedulerOperations,
+			SchedulingTopology schedulingTopology) {
+		this.schedulerOperations = checkNotNull(schedulerOperations);
+		this.schedulingTopology = checkNotNull(schedulingTopology);
+	}
+
+	@Override
+	public void startScheduling() {
+		final Set<ExecutionVertexID> allVertices = getAllVerticesFromTopology();
+		allocateSlotsAndDeploy(allVertices);
+	}
+
+	@Override
+	public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+		allocateSlotsAndDeploy(verticesToRestart);
+	}
+
+	@Override
+	public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
+		// Will not react to these notifications.
+	}
+
+	@Override
+	public void onPartitionConsumable(ExecutionVertexID executionVertexId, ResultPartitionID resultPartitionId) {
+		// Will not react to these notifications.
+	}
+
+	private void allocateSlotsAndDeploy(final Set<ExecutionVertexID> verticesToDeploy) {
+		final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions =
+				createExecutionVertexDeploymentOptions(verticesToDeploy);
+		schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+	}
+
+	private Set<ExecutionVertexID> getAllVerticesFromTopology() {
+		return StreamSupport
+				.stream(schedulingTopology.getVertices().spliterator(), false)
+				.map(SchedulingExecutionVertex::getId)
+				.collect(Collectors.toSet());
+	}
+
+	private List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOptions(
+			final Collection<ExecutionVertexID> vertices) {
+		List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions = new ArrayList<>(vertices.size());
+		for (ExecutionVertexID executionVertexID : vertices) {
+			executionVertexDeploymentOptions.add(
+					new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+		}
+		return executionVertexDeploymentOptions;
+	}
+
+	/**
+	 * The factory for creating {@link EagerSchedulingStrategy}.
+	 */
+	public static class Factory implements SchedulingStrategyFactory {
+
+		@Override
+		public SchedulingStrategy createInstance(
+				SchedulerOperations schedulerOperations,
+				SchedulingTopology schedulingTopology,
+				JobGraph jobGraph) {
+			return new EagerSchedulingStrategy(schedulerOperations, schedulingTopology);
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
index 4522a35..5165891 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
@@ -40,9 +40,9 @@ public interface SchedulingStrategy {
 	/**
 	 * Called whenever vertices need to be restarted (due to task failure).
 	 *
-	 * @param verticesNeedingRestart The tasks need to be restarted
+	 * @param verticesToRestart The tasks need to be restarted
 	 */
-	void restartTasks(Set<ExecutionVertexID> verticesNeedingRestart);
+	void restartTasks(Set<ExecutionVertexID> verticesToRestart);
 
 	/**
 	 * Called whenever an {@link Execution} changes its state.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
new file mode 100644
index 0000000..881159d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link EagerSchedulingStrategy}.
+ */
+public class EagerSchedulingStrategyTest extends TestLogger {
+
+	private TestingSchedulerOperations testingSchedulerOperations;
+
+	private TestingSchedulingTopology testingSchedulingTopology;
+
+	private EagerSchedulingStrategy schedulingStrategy;
+
+	@Before
+	public void setUp() {
+		testingSchedulerOperations = new TestingSchedulerOperations();
+		testingSchedulingTopology = new TestingSchedulingTopology();
+		schedulingStrategy = new EagerSchedulingStrategy(
+				testingSchedulerOperations,
+				testingSchedulingTopology);
+	}
+
+	/**
+	 * Tests that when start scheduling eager scheduling strategy will start all vertices in scheduling topology.
+	 */
+	@Test
+	public void testStartScheduling() {
+		JobVertexID jobVertexID = new JobVertexID();
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 0));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 1));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 2));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
+
+		schedulingStrategy.startScheduling();
+
+		assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(1));
+
+		Collection<ExecutionVertexDeploymentOption> scheduledVertices = testingSchedulerOperations.getScheduledVertices().get(0);
+		assertThat(scheduledVertices, hasSize(5));
+		Collection<ExecutionVertexID> vertices = getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+		for (SchedulingExecutionVertex schedulingExecutionVertex : testingSchedulingTopology.getVertices()) {
+			assertThat(vertices, hasItem(schedulingExecutionVertex.getId()));
+		}
+	}
+
+	/**
+	 * Tests that eager scheduling strategy will restart all vertices needing restarted at same time.
+	 */
+	@Test
+	public void testRestartTasks() {
+		JobVertexID jobVertexID = new JobVertexID();
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 0));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 1));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 2));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
+
+		Set<ExecutionVertexID> toBeRestartedVertices1 = new HashSet<>(Arrays.asList(
+				new ExecutionVertexID(jobVertexID, 0),
+				new ExecutionVertexID(jobVertexID, 4)));
+		schedulingStrategy.restartTasks(toBeRestartedVertices1);
+
+		Set<ExecutionVertexID> toBeRestartedVertices2 = new HashSet<>(Arrays.asList(
+				new ExecutionVertexID(jobVertexID, 1),
+				new ExecutionVertexID(jobVertexID, 2),
+				new ExecutionVertexID(jobVertexID, 3)));
+		schedulingStrategy.restartTasks(toBeRestartedVertices2);
+
+		assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(2));
+
+		Collection<ExecutionVertexDeploymentOption> scheduledVertices1 = testingSchedulerOperations.getScheduledVertices().get(0);
+		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(toBeRestartedVertices1.toArray()));
+
+		Collection<ExecutionVertexDeploymentOption> scheduledVertices2 = testingSchedulerOperations.getScheduledVertices().get(1);
+		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(toBeRestartedVertices2.toArray()));
+	}
+
+	private static Collection<ExecutionVertexID> getExecutionVertexIdsFromDeployOptions(
+			Collection<ExecutionVertexDeploymentOption> deploymentOptions) {
+		return deploymentOptions.stream()
+				.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
+				.collect(Collectors.toList());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
new file mode 100644
index 0000000..3a67b39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperations implements SchedulerOperations {
+
+	private final List<Collection<ExecutionVertexDeploymentOption>> scheduledVertices = new ArrayList<>();
+
+	@Override
+	public void allocateSlotsAndDeploy(Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		scheduledVertices.add(executionVertexDeploymentOptions);
+	}
+
+	public List<Collection<ExecutionVertexDeploymentOption>> getScheduledVertices() {
+		return Collections.unmodifiableList(scheduledVertices);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
new file mode 100644
index 0000000..8681fe1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A simple scheduling execution vertex for testing purposes.
+ */
+public class TestingSchedulingExecutionVertex implements SchedulingExecutionVertex {
+
+	private final ExecutionVertexID executionVertexId;
+
+	public TestingSchedulingExecutionVertex(JobVertexID jobVertexId, int subtaskIndex) {
+		this.executionVertexId = new ExecutionVertexID(jobVertexId, subtaskIndex);
+	}
+
+	@Override
+	public ExecutionVertexID getId() {
+		return executionVertexId;
+	}
+
+	@Override
+	public ExecutionState getState() {
+		return ExecutionState.CREATED;
+	}
+
+	@Override
+	public Collection<SchedulingResultPartition> getConsumedResultPartitions() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public Collection<SchedulingResultPartition> getProducedResultPartitions() {
+		return Collections.emptyList();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
new file mode 100644
index 0000000..e827f78
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+	private final Map<ExecutionVertexID, SchedulingExecutionVertex> schedulingExecutionVertices = new HashMap<>();
+
+	private final Map<IntermediateResultPartitionID, SchedulingResultPartition> schedulingResultPartitions = new HashMap<>();
+
+	@Override
+	public Iterable<SchedulingExecutionVertex> getVertices() {
+		return Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+	}
+
+	@Override
+	public Optional<SchedulingExecutionVertex> getVertex(ExecutionVertexID executionVertexId)  {
+		return Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+	}
+
+	@Override
+	public Optional<SchedulingResultPartition> getResultPartition(
+			IntermediateResultPartitionID intermediateResultPartitionId) {
+		return Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+	}
+
+	public void addSchedulingExecutionVertex(SchedulingExecutionVertex schedulingExecutionVertex) {
+		schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), schedulingExecutionVertex);
+		addSchedulingResultPartitions(schedulingExecutionVertex.getConsumedResultPartitions());
+		addSchedulingResultPartitions(schedulingExecutionVertex.getProducedResultPartitions());
+	}
+
+	private void addSchedulingResultPartitions(final Collection<SchedulingResultPartition> resultPartitions) {
+		for (SchedulingResultPartition schedulingResultPartition : resultPartitions) {
+			schedulingResultPartitions.put(schedulingResultPartition.getId(), schedulingResultPartition);
+		}
+	}
+}