You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/01/24 09:06:06 UTC

[flink] branch master updated: [FLINK-25036][runtime] Introduce vertex wise scheduling strategy

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5c1e67  [FLINK-25036][runtime] Introduce vertex wise scheduling strategy
e5c1e67 is described below

commit e5c1e67e2169534d5e837e5690360b075b66dd75
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Wed Dec 22 20:09:15 2021 +0800

    [FLINK-25036][runtime] Introduce vertex wise scheduling strategy
    
    This closes #18126.
---
 .../strategy/VertexwiseSchedulingStrategy.java     | 187 +++++++++++++++++
 .../PipelinedRegionSchedulingStrategyTest.java     |  43 +---
 .../scheduler/strategy/StrategyTestUtil.java       |  34 +++
 .../strategy/VertexwiseSchedulingStrategyTest.java | 232 +++++++++++++++++++++
 4 files changed, 462 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
new file mode 100644
index 0000000..b877b5b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
+import org.apache.flink.util.IterableUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SchedulingStrategy} instance which schedules tasks in granularity of vertex (which
+ * indicates this strategy only supports ALL_EDGES_BLOCKING batch jobs). Note that this strategy
+ * implements {@link SchedulingTopologyListener}, so it can handle the updates of scheduling
+ * topology.
+ */
+public class VertexwiseSchedulingStrategy
+        implements SchedulingStrategy, SchedulingTopologyListener {
+
+    private final SchedulerOperations schedulerOperations;
+
+    private final SchedulingTopology schedulingTopology;
+
+    private final DeploymentOption deploymentOption = new DeploymentOption(false);
+
+    private final Set<ExecutionVertexID> newVertices = new HashSet<>();
+
+    public VertexwiseSchedulingStrategy(
+            final SchedulerOperations schedulerOperations,
+            final SchedulingTopology schedulingTopology) {
+
+        this.schedulerOperations = checkNotNull(schedulerOperations);
+        this.schedulingTopology = checkNotNull(schedulingTopology);
+        schedulingTopology.registerSchedulingTopologyListener(this);
+    }
+
+    @Override
+    public void startScheduling() {
+        Set<ExecutionVertexID> sourceVertices =
+                IterableUtils.toStream(schedulingTopology.getVertices())
+                        .filter(vertex -> vertex.getConsumedPartitionGroups().isEmpty())
+                        .map(SchedulingExecutionVertex::getId)
+                        .collect(Collectors.toSet());
+
+        maybeScheduleVertices(sourceVertices);
+    }
+
+    @Override
+    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+        maybeScheduleVertices(verticesToRestart);
+    }
+
+    @Override
+    public void onExecutionStateChange(
+            ExecutionVertexID executionVertexId, ExecutionState executionState) {
+        if (executionState == ExecutionState.FINISHED) {
+            SchedulingExecutionVertex executionVertex =
+                    schedulingTopology.getVertex(executionVertexId);
+
+            Set<ExecutionVertexID> consumerVertices =
+                    IterableUtils.toStream(executionVertex.getProducedResults())
+                            .map(SchedulingResultPartition::getConsumerVertexGroup)
+                            .filter(Optional::isPresent)
+                            .flatMap(
+                                    consumerVertexGroup ->
+                                            IterableUtils.toStream(consumerVertexGroup.get()))
+                            .collect(Collectors.toSet());
+
+            maybeScheduleVertices(consumerVertices);
+        }
+    }
+
+    @Override
+    public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {}
+
+    @Override
+    public void notifySchedulingTopologyUpdated(
+            SchedulingTopology schedulingTopology, List<ExecutionVertexID> newExecutionVertices) {
+        checkState(schedulingTopology == this.schedulingTopology);
+        newVertices.addAll(newExecutionVertices);
+    }
+
+    private void maybeScheduleVertices(final Set<ExecutionVertexID> vertices) {
+        final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
+
+        Set<ExecutionVertexID> allCandidates;
+        if (newVertices.isEmpty()) {
+            allCandidates = vertices;
+        } else {
+            allCandidates = new HashSet<>(vertices);
+            allCandidates.addAll(newVertices);
+            newVertices.clear();
+        }
+
+        final Set<ExecutionVertexID> verticesToDeploy =
+                allCandidates.stream()
+                        .filter(
+                                vertexId -> {
+                                    SchedulingExecutionVertex vertex =
+                                            schedulingTopology.getVertex(vertexId);
+                                    checkState(vertex.getState() == ExecutionState.CREATED);
+                                    return areVertexInputsAllConsumable(
+                                            vertex, consumableStatusCache);
+                                })
+                        .collect(Collectors.toSet());
+
+        scheduleVerticesOneByOne(verticesToDeploy);
+    }
+
+    private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToDeploy) {
+        if (verticesToDeploy.isEmpty()) {
+            return;
+        }
+        final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
+                SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(
+                        schedulingTopology, verticesToDeploy, id -> deploymentOption);
+
+        vertexDeploymentOptions.forEach(
+                option ->
+                        schedulerOperations.allocateSlotsAndDeploy(
+                                Collections.singletonList(option)));
+    }
+
+    private boolean areVertexInputsAllConsumable(
+            SchedulingExecutionVertex vertex,
+            Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
+
+            if (!consumableStatusCache.computeIfAbsent(
+                    consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
+            if (schedulingTopology.getResultPartition(partitionId).getState()
+                    != ResultPartitionState.CONSUMABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** The factory for creating {@link VertexwiseSchedulingStrategy}. */
+    public static class Factory implements SchedulingStrategyFactory {
+        @Override
+        public SchedulingStrategy createInstance(
+                final SchedulerOperations schedulerOperations,
+                final SchedulingTopology schedulingTopology) {
+            return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology);
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
index 05764f2..4211955 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
@@ -42,8 +42,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -109,7 +109,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
                 new ArrayList<>();
         expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0)));
         expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1)));
-        assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
     }
 
     @Test
@@ -129,7 +130,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
                 new ArrayList<>();
         expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0)));
         expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1)));
-        assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
     }
 
     @Test
@@ -154,7 +156,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
                 new ArrayList<>();
         expectedScheduledVertices.add(Arrays.asList(sink.get(0)));
         expectedScheduledVertices.add(Arrays.asList(sink.get(1)));
-        assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
     }
 
     @Test
@@ -176,7 +179,8 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
         final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
                 new ArrayList<>();
         expectedScheduledVertices.add(Arrays.asList(v1.get(0)));
-        assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
     }
 
     @Test
@@ -355,33 +359,4 @@ public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
         schedulingStrategy.startScheduling();
         return schedulingStrategy;
     }
-
-    private void assertLatestScheduledVerticesAreEqualTo(
-            final List<List<TestingSchedulingExecutionVertex>> expected) {
-        final List<List<ExecutionVertexDeploymentOption>> deploymentOptions =
-                testingSchedulerOperation.getScheduledVertices();
-        final int expectedScheduledBulks = expected.size();
-        assertThat(expectedScheduledBulks, lessThanOrEqualTo(deploymentOptions.size()));
-        for (int i = 0; i < expectedScheduledBulks; i++) {
-            assertEquals(
-                    idsFromVertices(expected.get(expectedScheduledBulks - i - 1)),
-                    idsFromDeploymentOptions(
-                            deploymentOptions.get(deploymentOptions.size() - i - 1)));
-        }
-    }
-
-    private static List<ExecutionVertexID> idsFromVertices(
-            final List<TestingSchedulingExecutionVertex> vertices) {
-        return vertices.stream()
-                .map(TestingSchedulingExecutionVertex::getId)
-                .collect(Collectors.toList());
-    }
-
-    private static List<ExecutionVertexID> idsFromDeploymentOptions(
-            final List<ExecutionVertexDeploymentOption> deploymentOptions) {
-
-        return deploymentOptions.stream()
-                .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
-                .collect(Collectors.toList());
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
index 30722e5..15e0318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
@@ -23,6 +23,10 @@ import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+
 /** Strategy test utilities. */
 public class StrategyTestUtil {
 
@@ -33,4 +37,34 @@ public class StrategyTestUtil {
                 .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
                 .collect(Collectors.toList());
     }
+
+    static void assertLatestScheduledVerticesAreEqualTo(
+            final List<List<TestingSchedulingExecutionVertex>> expected,
+            TestingSchedulerOperations testingSchedulerOperation) {
+        final List<List<ExecutionVertexDeploymentOption>> deploymentOptions =
+                testingSchedulerOperation.getScheduledVertices();
+        final int expectedScheduledBulks = expected.size();
+        assertThat(expectedScheduledBulks, lessThanOrEqualTo(deploymentOptions.size()));
+        for (int i = 0; i < expectedScheduledBulks; i++) {
+            assertEquals(
+                    idsFromVertices(expected.get(expectedScheduledBulks - i - 1)),
+                    idsFromDeploymentOptions(
+                            deploymentOptions.get(deploymentOptions.size() - i - 1)));
+        }
+    }
+
+    static List<ExecutionVertexID> idsFromVertices(
+            final List<TestingSchedulingExecutionVertex> vertices) {
+        return vertices.stream()
+                .map(TestingSchedulingExecutionVertex::getId)
+                .collect(Collectors.toList());
+    }
+
+    static List<ExecutionVertexID> idsFromDeploymentOptions(
+            final List<ExecutionVertexDeploymentOption> deploymentOptions) {
+
+        return deploymentOptions.stream()
+                .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
+                .collect(Collectors.toList());
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
new file mode 100644
index 0000000..70a254c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+
+/** Unit tests for {@link VertexwiseSchedulingStrategy}. */
+public class VertexwiseSchedulingStrategyTest {
+
+    private TestingSchedulerOperations testingSchedulerOperation;
+
+    private static final int PARALLELISM = 2;
+
+    private TestingSchedulingTopology testingSchedulingTopology;
+
+    private List<TestingSchedulingExecutionVertex> source;
+
+    private List<TestingSchedulingExecutionVertex> map;
+
+    private List<TestingSchedulingExecutionVertex> sink;
+
+    @Before
+    public void setUp() {
+        testingSchedulerOperation = new TestingSchedulerOperations();
+
+        buildTopology();
+    }
+
+    private void buildTopology() {
+        testingSchedulingTopology = new TestingSchedulingTopology();
+
+        source =
+                testingSchedulingTopology
+                        .addExecutionVertices()
+                        .withParallelism(PARALLELISM)
+                        .finish();
+        map =
+                testingSchedulingTopology
+                        .addExecutionVertices()
+                        .withParallelism(PARALLELISM)
+                        .finish();
+        sink =
+                testingSchedulingTopology
+                        .addExecutionVertices()
+                        .withParallelism(PARALLELISM)
+                        .finish();
+
+        testingSchedulingTopology
+                .connectPointwise(source, map)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.BLOCKING)
+                .finish();
+        testingSchedulingTopology
+                .connectAllToAll(map, sink)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.BLOCKING)
+                .finish();
+    }
+
+    @Test
+    public void testStartScheduling() {
+        startScheduling(testingSchedulingTopology);
+
+        final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
+                new ArrayList<>();
+        expectedScheduledVertices.add(Arrays.asList(source.get(0)));
+        expectedScheduledVertices.add(Arrays.asList(source.get(1)));
+
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
+    }
+
+    @Test
+    public void testRestartTasks() {
+        final VertexwiseSchedulingStrategy schedulingStrategy =
+                startScheduling(testingSchedulingTopology);
+
+        final Set<ExecutionVertexID> verticesToRestart =
+                Stream.of(source, map, sink)
+                        .flatMap(List::stream)
+                        .map(TestingSchedulingExecutionVertex::getId)
+                        .collect(Collectors.toSet());
+
+        schedulingStrategy.restartTasks(verticesToRestart);
+
+        final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
+                new ArrayList<>();
+        expectedScheduledVertices.add(Arrays.asList(source.get(0)));
+        expectedScheduledVertices.add(Arrays.asList(source.get(1)));
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
+    }
+
+    @Test
+    public void testOnExecutionStateChangeToFinished() {
+        // trigger source1, source2 scheduled.
+        final VertexwiseSchedulingStrategy schedulingStrategy =
+                startScheduling(testingSchedulingTopology);
+        assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(2));
+
+        // trigger map1 scheduled
+        final TestingSchedulingExecutionVertex source1 = source.get(0);
+        source1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        schedulingStrategy.onExecutionStateChange(source1.getId(), ExecutionState.FINISHED);
+        assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(3));
+
+        // trigger map2 scheduled
+        final TestingSchedulingExecutionVertex source2 = source.get(1);
+        source2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        schedulingStrategy.onExecutionStateChange(source2.getId(), ExecutionState.FINISHED);
+        assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(4));
+
+        // sinks' inputs are not all consumable yet so they are not scheduled
+        final TestingSchedulingExecutionVertex map1 = map.get(0);
+        map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);
+        assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(4));
+
+        // trigger sink1, sink2 scheduled
+        final TestingSchedulingExecutionVertex map2 = map.get(1);
+        map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);
+        assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(6));
+
+        final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
+                new ArrayList<>();
+        expectedScheduledVertices.add(Arrays.asList(source.get(0)));
+        expectedScheduledVertices.add(Arrays.asList(source.get(1)));
+        expectedScheduledVertices.add(Arrays.asList(map.get(0)));
+        expectedScheduledVertices.add(Arrays.asList(map.get(1)));
+        expectedScheduledVertices.add(Arrays.asList(sink.get(0)));
+        expectedScheduledVertices.add(Arrays.asList(sink.get(1)));
+        assertLatestScheduledVerticesAreEqualTo(
+                expectedScheduledVertices, testingSchedulerOperation);
+    }
+
+    @Test
+    public void testUpdateStrategyWithAllToAll() {
+        testUpdateStrategyOnTopologyUpdate(true);
+    }
+
+    @Test
+    public void testUpdateStrategyWithPointWise() {
+        testUpdateStrategyOnTopologyUpdate(false);
+    }
+
+    public void testUpdateStrategyOnTopologyUpdate(boolean allToAll) {
+        final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        final VertexwiseSchedulingStrategy schedulingStrategy = startScheduling(topology);
+
+        final List<TestingSchedulingExecutionVertex> consumers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        // producer_0 finished
+        schedulingStrategy.onExecutionStateChange(
+                producers.get(0).getId(), ExecutionState.FINISHED);
+
+        // add consumers to scheduling strategy.
+        if (allToAll) {
+            topology.connectAllToAll(producers, consumers)
+                    .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+                    .withResultPartitionType(ResultPartitionType.BLOCKING)
+                    .finish();
+        } else {
+            topology.connectPointwise(producers, consumers)
+                    .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+                    .withResultPartitionType(ResultPartitionType.BLOCKING)
+                    .finish();
+        }
+
+        // producer_1 finished, consumer_0 and consumer_1 be added.
+        schedulingStrategy.notifySchedulingTopologyUpdated(
+                topology,
+                consumers.stream()
+                        .map(TestingSchedulingExecutionVertex::getId)
+                        .collect(Collectors.toList()));
+        schedulingStrategy.onExecutionStateChange(
+                producers.get(1).getId(), ExecutionState.FINISHED);
+
+        // consumers are properly scheduled indicates that the consuming relationship and
+        // correlation are successfully built
+        assertLatestScheduledVerticesAreEqualTo(
+                Arrays.asList(
+                        Arrays.asList(producers.get(0)),
+                        Arrays.asList(producers.get(1)),
+                        Arrays.asList(consumers.get(0)),
+                        Arrays.asList(consumers.get(1))),
+                testingSchedulerOperation);
+    }
+
+    VertexwiseSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
+        final VertexwiseSchedulingStrategy schedulingStrategy =
+                new VertexwiseSchedulingStrategy(testingSchedulerOperation, schedulingTopology);
+        schedulingStrategy.startScheduling();
+        return schedulingStrategy;
+    }
+}