You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 13:05:51 UTC
[flink] 01/02: [FLINK-12228][runtime] Implement Eager Scheduling
Strategy
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6fc678297a29e16b73ff8d5159214d5afec0ef2
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Fri Apr 26 13:54:06 2019 +0800
[FLINK-12228][runtime] Implement Eager Scheduling Strategy
This closes #8296.
---
.../scheduler/ExecutionVertexDeploymentOption.java | 14 ++-
.../strategy/EagerSchedulingStrategy.java | 112 +++++++++++++++++++
.../scheduler/strategy/SchedulingStrategy.java | 4 +-
.../strategy/EagerSchedulingStrategyTest.java | 121 +++++++++++++++++++++
.../strategy/TestingSchedulerOperations.java | 44 ++++++++
.../strategy/TestingSchedulingExecutionVertex.java | 57 ++++++++++
.../strategy/TestingSchedulingTopology.java | 65 +++++++++++
7 files changed, 413 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
index 829f6ba..9831a9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Component that stores the task need to be scheduled and the option for deployment.
*/
@@ -30,7 +32,15 @@ public class ExecutionVertexDeploymentOption {
private final DeploymentOption deploymentOption;
public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) {
- this.executionVertexId = executionVertexId;
- this.deploymentOption = deploymentOption;
+ this.executionVertexId = checkNotNull(executionVertexId);
+ this.deploymentOption = checkNotNull(deploymentOption);
+ }
+
+ public ExecutionVertexID getExecutionVertexId() {
+ return executionVertexId;
+ }
+
+ public DeploymentOption getDeploymentOption() {
+ return deploymentOption;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
new file mode 100644
index 0000000..954fff5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+ private final SchedulerOperations schedulerOperations;
+
+ private final SchedulingTopology schedulingTopology;
+
+ private final DeploymentOption deploymentOption = new DeploymentOption(false);
+
+ public EagerSchedulingStrategy(
+ SchedulerOperations schedulerOperations,
+ SchedulingTopology schedulingTopology) {
+ this.schedulerOperations = checkNotNull(schedulerOperations);
+ this.schedulingTopology = checkNotNull(schedulingTopology);
+ }
+
+ @Override
+ public void startScheduling() {
+ final Set<ExecutionVertexID> allVertices = getAllVerticesFromTopology();
+ allocateSlotsAndDeploy(allVertices);
+ }
+
+ @Override
+ public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+ allocateSlotsAndDeploy(verticesToRestart);
+ }
+
+ @Override
+ public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
+ // Will not react to these notifications.
+ }
+
+ @Override
+ public void onPartitionConsumable(ExecutionVertexID executionVertexId, ResultPartitionID resultPartitionId) {
+ // Will not react to these notifications.
+ }
+
+ private void allocateSlotsAndDeploy(final Set<ExecutionVertexID> verticesToDeploy) {
+ final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions =
+ createExecutionVertexDeploymentOptions(verticesToDeploy);
+ schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+ }
+
+ private Set<ExecutionVertexID> getAllVerticesFromTopology() {
+ return StreamSupport
+ .stream(schedulingTopology.getVertices().spliterator(), false)
+ .map(SchedulingExecutionVertex::getId)
+ .collect(Collectors.toSet());
+ }
+
+ private List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOptions(
+ final Collection<ExecutionVertexID> vertices) {
+ List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions = new ArrayList<>(vertices.size());
+ for (ExecutionVertexID executionVertexID : vertices) {
+ executionVertexDeploymentOptions.add(
+ new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+ }
+ return executionVertexDeploymentOptions;
+ }
+
+ /**
+ * The factory for creating {@link EagerSchedulingStrategy}.
+ */
+ public static class Factory implements SchedulingStrategyFactory {
+
+ @Override
+ public SchedulingStrategy createInstance(
+ SchedulerOperations schedulerOperations,
+ SchedulingTopology schedulingTopology,
+ JobGraph jobGraph) {
+ return new EagerSchedulingStrategy(schedulerOperations, schedulingTopology);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
index 4522a35..5165891 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
@@ -40,9 +40,9 @@ public interface SchedulingStrategy {
/**
* Called whenever vertices need to be restarted (due to task failure).
*
- * @param verticesNeedingRestart The tasks need to be restarted
+ * @param verticesToRestart The tasks need to be restarted
*/
- void restartTasks(Set<ExecutionVertexID> verticesNeedingRestart);
+ void restartTasks(Set<ExecutionVertexID> verticesToRestart);
/**
* Called whenever an {@link Execution} changes its state.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
new file mode 100644
index 0000000..881159d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link EagerSchedulingStrategy}.
+ */
+public class EagerSchedulingStrategyTest extends TestLogger {
+
+ private TestingSchedulerOperations testingSchedulerOperations;
+
+ private TestingSchedulingTopology testingSchedulingTopology;
+
+ private EagerSchedulingStrategy schedulingStrategy;
+
+ @Before
+ public void setUp() {
+ testingSchedulerOperations = new TestingSchedulerOperations();
+ testingSchedulingTopology = new TestingSchedulingTopology();
+ schedulingStrategy = new EagerSchedulingStrategy(
+ testingSchedulerOperations,
+ testingSchedulingTopology);
+ }
+
+ /**
+ * Tests that when start scheduling eager scheduling strategy will start all vertices in scheduling topology.
+ */
+ @Test
+ public void testStartScheduling() {
+ JobVertexID jobVertexID = new JobVertexID();
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 0));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 1));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 2));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
+
+ schedulingStrategy.startScheduling();
+
+ assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(1));
+
+ Collection<ExecutionVertexDeploymentOption> scheduledVertices = testingSchedulerOperations.getScheduledVertices().get(0);
+ assertThat(scheduledVertices, hasSize(5));
+ Collection<ExecutionVertexID> vertices = getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+ for (SchedulingExecutionVertex schedulingExecutionVertex : testingSchedulingTopology.getVertices()) {
+ assertThat(vertices, hasItem(schedulingExecutionVertex.getId()));
+ }
+ }
+
+ /**
+ * Tests that eager scheduling strategy will restart all vertices needing restarted at same time.
+ */
+ @Test
+ public void testRestartTasks() {
+ JobVertexID jobVertexID = new JobVertexID();
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 0));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 1));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 2));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
+ testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
+
+ Set<ExecutionVertexID> toBeRestartedVertices1 = new HashSet<>(Arrays.asList(
+ new ExecutionVertexID(jobVertexID, 0),
+ new ExecutionVertexID(jobVertexID, 4)));
+ schedulingStrategy.restartTasks(toBeRestartedVertices1);
+
+ Set<ExecutionVertexID> toBeRestartedVertices2 = new HashSet<>(Arrays.asList(
+ new ExecutionVertexID(jobVertexID, 1),
+ new ExecutionVertexID(jobVertexID, 2),
+ new ExecutionVertexID(jobVertexID, 3)));
+ schedulingStrategy.restartTasks(toBeRestartedVertices2);
+
+ assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(2));
+
+ Collection<ExecutionVertexDeploymentOption> scheduledVertices1 = testingSchedulerOperations.getScheduledVertices().get(0);
+ assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(toBeRestartedVertices1.toArray()));
+
+ Collection<ExecutionVertexDeploymentOption> scheduledVertices2 = testingSchedulerOperations.getScheduledVertices().get(1);
+ assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(toBeRestartedVertices2.toArray()));
+ }
+
+ private static Collection<ExecutionVertexID> getExecutionVertexIdsFromDeployOptions(
+ Collection<ExecutionVertexDeploymentOption> deploymentOptions) {
+ return deploymentOptions.stream()
+ .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
new file mode 100644
index 0000000..3a67b39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperations implements SchedulerOperations {
+
+ private final List<Collection<ExecutionVertexDeploymentOption>> scheduledVertices = new ArrayList<>();
+
+ @Override
+ public void allocateSlotsAndDeploy(Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+ scheduledVertices.add(executionVertexDeploymentOptions);
+ }
+
+ public List<Collection<ExecutionVertexDeploymentOption>> getScheduledVertices() {
+ return Collections.unmodifiableList(scheduledVertices);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
new file mode 100644
index 0000000..8681fe1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A simple scheduling execution vertex for testing purposes.
+ */
+public class TestingSchedulingExecutionVertex implements SchedulingExecutionVertex {
+
+ private final ExecutionVertexID executionVertexId;
+
+ public TestingSchedulingExecutionVertex(JobVertexID jobVertexId, int subtaskIndex) {
+ this.executionVertexId = new ExecutionVertexID(jobVertexId, subtaskIndex);
+ }
+
+ @Override
+ public ExecutionVertexID getId() {
+ return executionVertexId;
+ }
+
+ @Override
+ public ExecutionState getState() {
+ return ExecutionState.CREATED;
+ }
+
+ @Override
+ public Collection<SchedulingResultPartition> getConsumedResultPartitions() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<SchedulingResultPartition> getProducedResultPartitions() {
+ return Collections.emptyList();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
new file mode 100644
index 0000000..e827f78
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+ private final Map<ExecutionVertexID, SchedulingExecutionVertex> schedulingExecutionVertices = new HashMap<>();
+
+ private final Map<IntermediateResultPartitionID, SchedulingResultPartition> schedulingResultPartitions = new HashMap<>();
+
+ @Override
+ public Iterable<SchedulingExecutionVertex> getVertices() {
+ return Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+ }
+
+ @Override
+ public Optional<SchedulingExecutionVertex> getVertex(ExecutionVertexID executionVertexId) {
+ return Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+ }
+
+ @Override
+ public Optional<SchedulingResultPartition> getResultPartition(
+ IntermediateResultPartitionID intermediateResultPartitionId) {
+ return Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+ }
+
+ public void addSchedulingExecutionVertex(SchedulingExecutionVertex schedulingExecutionVertex) {
+ schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), schedulingExecutionVertex);
+ addSchedulingResultPartitions(schedulingExecutionVertex.getConsumedResultPartitions());
+ addSchedulingResultPartitions(schedulingExecutionVertex.getProducedResultPartitions());
+ }
+
+ private void addSchedulingResultPartitions(final Collection<SchedulingResultPartition> resultPartitions) {
+ for (SchedulingResultPartition schedulingResultPartition : resultPartitions) {
+ schedulingResultPartitions.put(schedulingResultPartition.getId(), schedulingResultPartition);
+ }
+ }
+}