You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/01/24 09:06:06 UTC
[flink] branch master updated: [FLINK-25036][runtime] Introduce vertex wise scheduling strategy
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e5c1e67 [FLINK-25036][runtime] Introduce vertex wise scheduling strategy
e5c1e67 is described below
commit e5c1e67e2169534d5e837e5690360b075b66dd75
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Wed Dec 22 20:09:15 2021 +0800
[FLINK-25036][runtime] Introduce vertex wise scheduling strategy
This closes #18126.
---
.../strategy/VertexwiseSchedulingStrategy.java | 187 +++++++++++++++++
.../PipelinedRegionSchedulingStrategyTest.java | 43 +---
.../scheduler/strategy/StrategyTestUtil.java | 34 +++
.../strategy/VertexwiseSchedulingStrategyTest.java | 232 +++++++++++++++++++++
4 files changed, 462 insertions(+), 34 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
new file mode 100644
index 0000000..b877b5b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
+import org.apache.flink.util.IterableUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SchedulingStrategy} instance which schedules tasks in granularity of vertex (which
+ * indicates this strategy only supports ALL_EDGES_BLOCKING batch jobs). Note that this strategy
+ * implements {@link SchedulingTopologyListener}, so it can handle the updates of scheduling
+ * topology.
+ */
+public class VertexwiseSchedulingStrategy
+ implements SchedulingStrategy, SchedulingTopologyListener {
+
+ private final SchedulerOperations schedulerOperations;
+
+ private final SchedulingTopology schedulingTopology;
+
+ private final DeploymentOption deploymentOption = new DeploymentOption(false);
+
+ private final Set<ExecutionVertexID> newVertices = new HashSet<>();
+
+ public VertexwiseSchedulingStrategy(
+ final SchedulerOperations schedulerOperations,
+ final SchedulingTopology schedulingTopology) {
+
+ this.schedulerOperations = checkNotNull(schedulerOperations);
+ this.schedulingTopology = checkNotNull(schedulingTopology);
+ schedulingTopology.registerSchedulingTopologyListener(this);
+ }
+
+ @Override
+ public void startScheduling() {
+ Set<ExecutionVertexID> sourceVertices =
+ IterableUtils.toStream(schedulingTopology.getVertices())
+ .filter(vertex -> vertex.getConsumedPartitionGroups().isEmpty())
+ .map(SchedulingExecutionVertex::getId)
+ .collect(Collectors.toSet());
+
+ maybeScheduleVertices(sourceVertices);
+ }
+
+ @Override
+ public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+ maybeScheduleVertices(verticesToRestart);
+ }
+
+ @Override
+ public void onExecutionStateChange(
+ ExecutionVertexID executionVertexId, ExecutionState executionState) {
+ if (executionState == ExecutionState.FINISHED) {
+ SchedulingExecutionVertex executionVertex =
+ schedulingTopology.getVertex(executionVertexId);
+
+ Set<ExecutionVertexID> consumerVertices =
+ IterableUtils.toStream(executionVertex.getProducedResults())
+ .map(SchedulingResultPartition::getConsumerVertexGroup)
+ .filter(Optional::isPresent)
+ .flatMap(
+ consumerVertexGroup ->
+ IterableUtils.toStream(consumerVertexGroup.get()))
+ .collect(Collectors.toSet());
+
+ maybeScheduleVertices(consumerVertices);
+ }
+ }
+
+ @Override
+ public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {}
+
+ @Override
+ public void notifySchedulingTopologyUpdated(
+ SchedulingTopology schedulingTopology, List<ExecutionVertexID> newExecutionVertices) {
+ checkState(schedulingTopology == this.schedulingTopology);
+ newVertices.addAll(newExecutionVertices);
+ }
+
+ private void maybeScheduleVertices(final Set<ExecutionVertexID> vertices) {
+ final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
+
+ Set<ExecutionVertexID> allCandidates;
+ if (newVertices.isEmpty()) {
+ allCandidates = vertices;
+ } else {
+ allCandidates = new HashSet<>(vertices);
+ allCandidates.addAll(newVertices);
+ newVertices.clear();
+ }
+
+ final Set<ExecutionVertexID> verticesToDeploy =
+ allCandidates.stream()
+ .filter(
+ vertexId -> {
+ SchedulingExecutionVertex vertex =
+ schedulingTopology.getVertex(vertexId);
+ checkState(vertex.getState() == ExecutionState.CREATED);
+ return areVertexInputsAllConsumable(
+ vertex, consumableStatusCache);
+ })
+ .collect(Collectors.toSet());
+
+ scheduleVerticesOneByOne(verticesToDeploy);
+ }
+
+ private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToDeploy) {
+ if (verticesToDeploy.isEmpty()) {
+ return;
+ }
+ final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
+ SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(
+ schedulingTopology, verticesToDeploy, id -> deploymentOption);
+
+ vertexDeploymentOptions.forEach(
+ option ->
+ schedulerOperations.allocateSlotsAndDeploy(
+ Collections.singletonList(option)));
+ }
+
+ private boolean areVertexInputsAllConsumable(
+ SchedulingExecutionVertex vertex,
+ Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
+
+ if (!consumableStatusCache.computeIfAbsent(
+ consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isConsumedPartitionGroupConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup) {
+ for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
+ if (schedulingTopology.getResultPartition(partitionId).getState()
+ != ResultPartitionState.CONSUMABLE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** The factory for creating {@link VertexwiseSchedulingStrategy}. */
+ public static class Factory implements SchedulingStrategyFactory {
+ @Override
+ public SchedulingStrategy createInstance(
+ final SchedulerOperations schedulerOperations,
+ final SchedulingTopology schedulingTopology) {
+ return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
index 05764f2..4211955 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
@@ -42,8 +42,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -109,7 +109,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0)));
expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1)));
- assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
}
@Test
@@ -129,7 +130,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0)));
expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1)));
- assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
}
@Test
@@ -154,7 +156,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(sink.get(0)));
expectedScheduledVertices.add(Arrays.asList(sink.get(1)));
- assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
}
@Test
@@ -176,7 +179,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(v1.get(0)));
- assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
}
@Test
@@ -355,33 +359,4 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
schedulingStrategy.startScheduling();
return schedulingStrategy;
}
-
- private void assertLatestScheduledVerticesAreEqualTo(
- final List<List<TestingSchedulingExecutionVertex>> expected) {
- final List<List<ExecutionVertexDeploymentOption>> deploymentOptions =
- testingSchedulerOperation.getScheduledVertices();
- final int expectedScheduledBulks = expected.size();
- assertThat(expectedScheduledBulks, lessThanOrEqualTo(deploymentOptions.size()));
- for (int i = 0; i < expectedScheduledBulks; i++) {
- assertEquals(
- idsFromVertices(expected.get(expectedScheduledBulks - i - 1)),
- idsFromDeploymentOptions(
- deploymentOptions.get(deploymentOptions.size() - i - 1)));
- }
- }
-
- private static List<ExecutionVertexID> idsFromVertices(
- final List<TestingSchedulingExecutionVertex> vertices) {
- return vertices.stream()
- .map(TestingSchedulingExecutionVertex::getId)
- .collect(Collectors.toList());
- }
-
- private static List<ExecutionVertexID> idsFromDeploymentOptions(
- final List<ExecutionVertexDeploymentOption> deploymentOptions) {
-
- return deploymentOptions.stream()
- .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
- .collect(Collectors.toList());
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
index 30722e5..15e0318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
@@ -23,6 +23,10 @@ import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import java.util.List;
import java.util.stream.Collectors;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+
/** Strategy test utilities. */
public class StrategyTestUtil {
@@ -33,4 +37,34 @@ public class StrategyTestUtil {
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList());
}
+
+ static void assertLatestScheduledVerticesAreEqualTo(
+ final List<List<TestingSchedulingExecutionVertex>> expected,
+ TestingSchedulerOperations testingSchedulerOperation) {
+ final List<List<ExecutionVertexDeploymentOption>> deploymentOptions =
+ testingSchedulerOperation.getScheduledVertices();
+ final int expectedScheduledBulks = expected.size();
+ assertThat(expectedScheduledBulks, lessThanOrEqualTo(deploymentOptions.size()));
+ for (int i = 0; i < expectedScheduledBulks; i++) {
+ assertEquals(
+ idsFromVertices(expected.get(expectedScheduledBulks - i - 1)),
+ idsFromDeploymentOptions(
+ deploymentOptions.get(deploymentOptions.size() - i - 1)));
+ }
+ }
+
+ static List<ExecutionVertexID> idsFromVertices(
+ final List<TestingSchedulingExecutionVertex> vertices) {
+ return vertices.stream()
+ .map(TestingSchedulingExecutionVertex::getId)
+ .collect(Collectors.toList());
+ }
+
+ static List<ExecutionVertexID> idsFromDeploymentOptions(
+ final List<ExecutionVertexDeploymentOption> deploymentOptions) {
+
+ return deploymentOptions.stream()
+ .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
+ .collect(Collectors.toList());
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
new file mode 100644
index 0000000..70a254c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+
+/** Unit tests for {@link VertexwiseSchedulingStrategy}. */
+public class VertexwiseSchedulingStrategyTest {
+
+ private TestingSchedulerOperations testingSchedulerOperation;
+
+ private static final int PARALLELISM = 2;
+
+ private TestingSchedulingTopology testingSchedulingTopology;
+
+ private List<TestingSchedulingExecutionVertex> source;
+
+ private List<TestingSchedulingExecutionVertex> map;
+
+ private List<TestingSchedulingExecutionVertex> sink;
+
+ @Before
+ public void setUp() {
+ testingSchedulerOperation = new TestingSchedulerOperations();
+
+ buildTopology();
+ }
+
+ private void buildTopology() {
+ testingSchedulingTopology = new TestingSchedulingTopology();
+
+ source =
+ testingSchedulingTopology
+ .addExecutionVertices()
+ .withParallelism(PARALLELISM)
+ .finish();
+ map =
+ testingSchedulingTopology
+ .addExecutionVertices()
+ .withParallelism(PARALLELISM)
+ .finish();
+ sink =
+ testingSchedulingTopology
+ .addExecutionVertices()
+ .withParallelism(PARALLELISM)
+ .finish();
+
+ testingSchedulingTopology
+ .connectPointwise(source, map)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+ testingSchedulingTopology
+ .connectAllToAll(map, sink)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+ }
+
+ @Test
+ public void testStartScheduling() {
+ startScheduling(testingSchedulingTopology);
+
+ final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
+ new ArrayList<>();
+ expectedScheduledVertices.add(Arrays.asList(source.get(0)));
+ expectedScheduledVertices.add(Arrays.asList(source.get(1)));
+
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
+ }
+
+ @Test
+ public void testRestartTasks() {
+ final VertexwiseSchedulingStrategy schedulingStrategy =
+ startScheduling(testingSchedulingTopology);
+
+ final Set<ExecutionVertexID> verticesToRestart =
+ Stream.of(source, map, sink)
+ .flatMap(List::stream)
+ .map(TestingSchedulingExecutionVertex::getId)
+ .collect(Collectors.toSet());
+
+ schedulingStrategy.restartTasks(verticesToRestart);
+
+ final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
+ new ArrayList<>();
+ expectedScheduledVertices.add(Arrays.asList(source.get(0)));
+ expectedScheduledVertices.add(Arrays.asList(source.get(1)));
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
+ }
+
+ @Test
+ public void testOnExecutionStateChangeToFinished() {
+ // trigger source1, source2 scheduled.
+ final VertexwiseSchedulingStrategy schedulingStrategy =
+ startScheduling(testingSchedulingTopology);
+ assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(2));
+
+ // trigger map1 scheduled
+ final TestingSchedulingExecutionVertex source1 = source.get(0);
+ source1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ schedulingStrategy.onExecutionStateChange(source1.getId(), ExecutionState.FINISHED);
+ assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(3));
+
+ // trigger map2 scheduled
+ final TestingSchedulingExecutionVertex source2 = source.get(1);
+ source2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ schedulingStrategy.onExecutionStateChange(source2.getId(), ExecutionState.FINISHED);
+ assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(4));
+
+ // sinks' inputs are not all consumable yet so they are not scheduled
+ final TestingSchedulingExecutionVertex map1 = map.get(0);
+ map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);
+ assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(4));
+
+ // trigger sink1, sink2 scheduled
+ final TestingSchedulingExecutionVertex map2 = map.get(1);
+ map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);
+ assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(6));
+
+ final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
+ new ArrayList<>();
+ expectedScheduledVertices.add(Arrays.asList(source.get(0)));
+ expectedScheduledVertices.add(Arrays.asList(source.get(1)));
+ expectedScheduledVertices.add(Arrays.asList(map.get(0)));
+ expectedScheduledVertices.add(Arrays.asList(map.get(1)));
+ expectedScheduledVertices.add(Arrays.asList(sink.get(0)));
+ expectedScheduledVertices.add(Arrays.asList(sink.get(1)));
+ assertLatestScheduledVerticesAreEqualTo(
+ expectedScheduledVertices, testingSchedulerOperation);
+ }
+
+ @Test
+ public void testUpdateStrategyWithAllToAll() {
+ testUpdateStrategyOnTopologyUpdate(true);
+ }
+
+ @Test
+ public void testUpdateStrategyWithPointWise() {
+ testUpdateStrategyOnTopologyUpdate(false);
+ }
+
+ public void testUpdateStrategyOnTopologyUpdate(boolean allToAll) {
+ final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+ final List<TestingSchedulingExecutionVertex> producers =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ final VertexwiseSchedulingStrategy schedulingStrategy = startScheduling(topology);
+
+ final List<TestingSchedulingExecutionVertex> consumers =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ // producer_0 finished
+ schedulingStrategy.onExecutionStateChange(
+ producers.get(0).getId(), ExecutionState.FINISHED);
+
+ // add consumers to scheduling strategy.
+ if (allToAll) {
+ topology.connectAllToAll(producers, consumers)
+ .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+ } else {
+ topology.connectPointwise(producers, consumers)
+ .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+ }
+
+ // producer_1 finished, consumer_0 and consumer_1 be added.
+ schedulingStrategy.notifySchedulingTopologyUpdated(
+ topology,
+ consumers.stream()
+ .map(TestingSchedulingExecutionVertex::getId)
+ .collect(Collectors.toList()));
+ schedulingStrategy.onExecutionStateChange(
+ producers.get(1).getId(), ExecutionState.FINISHED);
+
+ // consumers are properly scheduled indicates that the consuming relationship and
+ // correlation are successfully built
+ assertLatestScheduledVerticesAreEqualTo(
+ Arrays.asList(
+ Arrays.asList(producers.get(0)),
+ Arrays.asList(producers.get(1)),
+ Arrays.asList(consumers.get(0)),
+ Arrays.asList(consumers.get(1))),
+ testingSchedulerOperation);
+ }
+
+ VertexwiseSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
+ final VertexwiseSchedulingStrategy schedulingStrategy =
+ new VertexwiseSchedulingStrategy(testingSchedulerOperation, schedulingTopology);
+ schedulingStrategy.startScheduling();
+ return schedulingStrategy;
+ }
+}