You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/15 02:16:34 UTC

[flink] 02/04: [FLINK-29767] Introduce InputConsumableDecider and using it to decide are input all consumable for VertexwiseSchedulingStrategy.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e739b3853a60b8c7584510b0becd87265306140e
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Oct 31 15:14:13 2022 +0800

    [FLINK-29767] Introduce InputConsumableDecider and using it to decide are input all consumable for VertexwiseSchedulingStrategy.
---
 .../strategy/DefaultInputConsumableDecider.java    | 107 ++++++++++++
 .../scheduler/strategy/InputConsumableDecider.java |  50 ++++++
 .../strategy/VertexwiseSchedulingStrategy.java     |  45 ++---
 .../DefaultInputConsumableDeciderTest.java         | 189 +++++++++++++++++++++
 .../strategy/TestingInputConsumableDecider.java    |  56 ++++++
 .../strategy/VertexwiseSchedulingStrategyTest.java |  55 ++++--
 6 files changed, 457 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
new file mode 100644
index 00000000000..1308568db7f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Default implementation of {@link InputConsumableDecider}. This decider will judge whether the
+ * executionVertex's inputs are consumable as follows:
+ *
+ * <p>For blocking consumed partition group: Whether all result partitions in the group are
+ * finished.
+ *
+ * <p>For canBePipelined consumed partition group: whether all result partitions in the group are
+ * scheduled.
+ */
+class DefaultInputConsumableDecider implements InputConsumableDecider {
+    private final Function<IntermediateResultPartitionID, SchedulingResultPartition>
+            resultPartitionRetriever;
+
+    private final Function<ExecutionVertexID, Boolean> scheduledVertexRetriever;
+
+    DefaultInputConsumableDecider(
+            Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
+            Function<IntermediateResultPartitionID, SchedulingResultPartition>
+                    resultPartitionRetriever) {
+        this.scheduledVertexRetriever = scheduledVertexRetriever;
+        this.resultPartitionRetriever = resultPartitionRetriever;
+    }
+
+    @Override
+    public boolean isInputConsumable(
+            SchedulingExecutionVertex executionVertex,
+            Set<ExecutionVertexID> verticesToSchedule,
+            Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                executionVertex.getConsumedPartitionGroups()) {
+
+            if (!consumableStatusCache.computeIfAbsent(
+                    consumedPartitionGroup,
+                    (group) -> isConsumedPartitionGroupConsumable(group, verticesToSchedule))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup,
+            final Set<ExecutionVertexID> verticesToSchedule) {
+        if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
+            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
+                ExecutionVertexID producerVertex =
+                        resultPartitionRetriever.apply(partitionId).getProducer().getId();
+                if (!verticesToSchedule.contains(producerVertex)
+                        && !scheduledVertexRetriever.apply(producerVertex)) {
+                    return false;
+                }
+            }
+        } else {
+            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
+                if (resultPartitionRetriever.apply(partitionId).getState()
+                        != ResultPartitionState.CONSUMABLE) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    /** Factory for {@link DefaultInputConsumableDecider}. */
+    public static class Factory implements InputConsumableDecider.Factory {
+
+        public static final InputConsumableDecider.Factory INSTANCE = new Factory();
+
+        // disable public instantiation.
+        private Factory() {}
+
+        @Override
+        public InputConsumableDecider createInstance(
+                SchedulingTopology schedulingTopology,
+                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) {
+            return new DefaultInputConsumableDecider(
+                    scheduledVertexRetriever, schedulingTopology::getResultPartition);
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
new file mode 100644
index 00000000000..e34cb06bc4e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * {@link InputConsumableDecider} is responsible for determining whether the input of an
+ * executionVertex is consumable.
+ */
+public interface InputConsumableDecider {
+    /**
+     * Determining whether the input of an execution vertex is consumable.
+     *
+     * @param executionVertex to be determined whether it's input is consumable.
+     * @param verticesToSchedule vertices that are not yet scheduled but already decided to be
+     *     scheduled.
+     * @param consumableStatusCache a cache for {@link ConsumedPartitionGroup} consumable status.
+     *     This is to avoid repetitive computation.
+     */
+    boolean isInputConsumable(
+            SchedulingExecutionVertex executionVertex,
+            Set<ExecutionVertexID> verticesToSchedule,
+            Map<ConsumedPartitionGroup, Boolean> consumableStatusCache);
+
+    /** Factory for {@link InputConsumableDecider}. */
+    interface Factory {
+        InputConsumableDecider createInstance(
+                SchedulingTopology schedulingTopology,
+                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
index 4d217a9eb97..74c47234a3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -51,12 +51,20 @@ public class VertexwiseSchedulingStrategy
 
     private final Set<ExecutionVertexID> newVertices = new HashSet<>();
 
+    private final Set<ExecutionVertexID> scheduledVertices = new HashSet<>();
+
+    private final InputConsumableDecider inputConsumableDecider;
+
     public VertexwiseSchedulingStrategy(
             final SchedulerOperations schedulerOperations,
-            final SchedulingTopology schedulingTopology) {
+            final SchedulingTopology schedulingTopology,
+            final InputConsumableDecider.Factory inputConsumableDeciderFactory) {
 
         this.schedulerOperations = checkNotNull(schedulerOperations);
         this.schedulingTopology = checkNotNull(schedulingTopology);
+        this.inputConsumableDecider =
+                inputConsumableDeciderFactory.createInstance(
+                        schedulingTopology, scheduledVertices::contains);
         schedulingTopology.registerSchedulingTopologyListener(this);
     }
 
@@ -73,6 +81,7 @@ public class VertexwiseSchedulingStrategy
 
     @Override
     public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+        scheduledVertices.removeAll(verticesToRestart);
         maybeScheduleVertices(verticesToRestart);
     }
 
@@ -123,12 +132,13 @@ public class VertexwiseSchedulingStrategy
                                     SchedulingExecutionVertex vertex =
                                             schedulingTopology.getVertex(vertexId);
                                     checkState(vertex.getState() == ExecutionState.CREATED);
-                                    return areVertexInputsAllConsumable(
-                                            vertex, consumableStatusCache);
+                                    return inputConsumableDecider.isInputConsumable(
+                                            vertex, Collections.emptySet(), consumableStatusCache);
                                 })
                         .collect(Collectors.toSet());
 
         scheduleVerticesOneByOne(verticesToDeploy);
+        scheduledVertices.addAll(verticesToDeploy);
     }
 
     private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToDeploy) {
@@ -143,37 +153,16 @@ public class VertexwiseSchedulingStrategy
                 id -> schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id)));
     }
 
-    private boolean areVertexInputsAllConsumable(
-            SchedulingExecutionVertex vertex,
-            Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
-        for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
-
-            if (!consumableStatusCache.computeIfAbsent(
-                    consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private boolean isConsumedPartitionGroupConsumable(
-            final ConsumedPartitionGroup consumedPartitionGroup) {
-        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
-            if (schedulingTopology.getResultPartition(partitionId).getState()
-                    != ResultPartitionState.CONSUMABLE) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     /** The factory for creating {@link VertexwiseSchedulingStrategy}. */
     public static class Factory implements SchedulingStrategyFactory {
         @Override
         public SchedulingStrategy createInstance(
                 final SchedulerOperations schedulerOperations,
                 final SchedulingTopology schedulingTopology) {
-            return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology);
+            return new VertexwiseSchedulingStrategy(
+                    schedulerOperations,
+                    schedulingTopology,
+                    DefaultInputConsumableDecider.Factory.INSTANCE);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
new file mode 100644
index 00000000000..6b373e3c09d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultInputConsumableDecider}. */
+class DefaultInputConsumableDeciderTest {
+    @Test
+    void testNotFinishedBlockingInput() {
+        final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        final List<TestingSchedulingExecutionVertex> consumer =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        topology.connectAllToAll(producers, consumer)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.BLOCKING)
+                .finish();
+
+        DefaultInputConsumableDecider inputConsumableDecider =
+                createDefaultInputConsumableDecider(Collections.emptySet(), topology);
+
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(0), Collections.emptySet(), new HashMap<>()))
+                .isFalse();
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(1), Collections.emptySet(), new HashMap<>()))
+                .isFalse();
+    }
+
+    @Test
+    void testAllFinishedBlockingInput() {
+        final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        final List<TestingSchedulingExecutionVertex> consumer =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        topology.connectAllToAll(producers, consumer)
+                .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+                .withResultPartitionType(ResultPartitionType.BLOCKING)
+                .finish();
+
+        DefaultInputConsumableDecider inputConsumableDecider =
+                createDefaultInputConsumableDecider(Collections.emptySet(), topology);
+
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(0), Collections.emptySet(), new HashMap<>()))
+                .isTrue();
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(1), Collections.emptySet(), new HashMap<>()))
+                .isTrue();
+    }
+
+    @Test
+    void testUpstreamNotScheduledHybridInput() {
+        final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        final List<TestingSchedulingExecutionVertex> consumer =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        topology.connectAllToAll(producers, consumer)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+                .finish();
+
+        DefaultInputConsumableDecider inputConsumableDecider =
+                createDefaultInputConsumableDecider(Collections.emptySet(), topology);
+
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(0), Collections.emptySet(), new HashMap<>()))
+                .isFalse();
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(1), Collections.emptySet(), new HashMap<>()))
+                .isFalse();
+    }
+
+    @Test
+    void testUpstreamAllScheduledHybridInput() {
+        final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        final List<TestingSchedulingExecutionVertex> consumer =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        topology.connectAllToAll(producers, consumer)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+                .finish();
+
+        HashSet<ExecutionVertexID> scheduledVertices = new HashSet<>();
+        DefaultInputConsumableDecider inputConsumableDecider =
+                createDefaultInputConsumableDecider(scheduledVertices, topology);
+        scheduledVertices.add(producers.get(0).getId());
+        HashSet<ExecutionVertexID> vertexToDeploy = new HashSet<>();
+        vertexToDeploy.add(producers.get(1).getId());
+
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(0), vertexToDeploy, new HashMap<>()))
+                .isTrue();
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(1), vertexToDeploy, new HashMap<>()))
+                .isTrue();
+    }
+
+    @Test
+    void testHybridAndBlockingInputButBlockingInputNotFinished() {
+        final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers1 =
+                topology.addExecutionVertices().withParallelism(1).finish();
+
+        final List<TestingSchedulingExecutionVertex> producers2 =
+                topology.addExecutionVertices().withParallelism(1).finish();
+
+        final List<TestingSchedulingExecutionVertex> consumer =
+                topology.addExecutionVertices().withParallelism(1).finish();
+
+        topology.connectAllToAll(producers1, consumer)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.BLOCKING)
+                .finish();
+
+        topology.connectAllToAll(producers2, consumer)
+                .withResultPartitionState(ResultPartitionState.CREATED)
+                .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+                .finish();
+
+        DefaultInputConsumableDecider inputConsumableDecider =
+                createDefaultInputConsumableDecider(
+                        Collections.singleton(producers2.get(0).getId()), topology);
+
+        assertThat(
+                        inputConsumableDecider.isInputConsumable(
+                                consumer.get(0), Collections.emptySet(), new HashMap<>()))
+                .isFalse();
+    }
+
+    private DefaultInputConsumableDecider createDefaultInputConsumableDecider(
+            Set<ExecutionVertexID> scheduledVertices, SchedulingTopology schedulingTopology) {
+        return new DefaultInputConsumableDecider(
+                scheduledVertices::contains, schedulingTopology::getResultPartition);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
new file mode 100644
index 00000000000..da2aa8241ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Mock {@link InputConsumableDecider} for testing. */
+public class TestingInputConsumableDecider implements InputConsumableDecider {
+
+    private final Set<SchedulingExecutionVertex> inputConsumableExecutionVertices = new HashSet<>();
+
+    private final Set<SchedulingExecutionVertex> sourceVertices = new HashSet<>();
+
+    private SchedulingExecutionVertex lastExecutionToDecideInputConsumable;
+
+    @Override
+    public boolean isInputConsumable(
+            SchedulingExecutionVertex executionVertex,
+            Set<ExecutionVertexID> verticesToDeploy,
+            Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        lastExecutionToDecideInputConsumable = executionVertex;
+        return sourceVertices.contains(executionVertex)
+                || inputConsumableExecutionVertices.contains(executionVertex);
+    }
+
+    public void setInputConsumable(SchedulingExecutionVertex executionVertex) {
+        inputConsumableExecutionVertices.add(executionVertex);
+    }
+
+    public void addSourceVertices(Collection<SchedulingExecutionVertex> sourceVertices) {
+        this.sourceVertices.addAll(sourceVertices);
+    }
+
+    public SchedulingExecutionVertex getLastExecutionToDecideInputConsumable() {
+        return lastExecutionToDecideInputConsumable;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
index 86cdd6a4f21..6b679b73e29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -44,6 +45,8 @@ class VertexwiseSchedulingStrategyTest {
 
     private TestingSchedulingTopology testingSchedulingTopology;
 
+    private TestingInputConsumableDecider inputConsumableDecider;
+
     private List<TestingSchedulingExecutionVertex> source;
 
     private List<TestingSchedulingExecutionVertex> map;
@@ -53,7 +56,7 @@ class VertexwiseSchedulingStrategyTest {
     @BeforeEach
     void setUp() {
         testingSchedulerOperation = new TestingSchedulerOperations();
-
+        inputConsumableDecider = new TestingInputConsumableDecider();
         buildTopology();
     }
 
@@ -90,13 +93,17 @@ class VertexwiseSchedulingStrategyTest {
 
     @Test
     void testStartScheduling() {
-        startScheduling(testingSchedulingTopology);
+        VertexwiseSchedulingStrategy schedulingStrategy =
+                createSchedulingStrategy(testingSchedulingTopology);
 
         final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
                 new ArrayList<>();
         expectedScheduledVertices.add(Collections.singletonList(source.get(0)));
         expectedScheduledVertices.add(Collections.singletonList(source.get(1)));
 
+        inputConsumableDecider.addSourceVertices(new HashSet<>(source));
+
+        schedulingStrategy.startScheduling();
         assertLatestScheduledVerticesAreEqualTo(
                 expectedScheduledVertices, testingSchedulerOperation);
     }
@@ -104,7 +111,9 @@ class VertexwiseSchedulingStrategyTest {
     @Test
     void testRestartTasks() {
         final VertexwiseSchedulingStrategy schedulingStrategy =
-                startScheduling(testingSchedulingTopology);
+                createSchedulingStrategy(testingSchedulingTopology);
+
+        inputConsumableDecider.addSourceVertices(new HashSet<>(source));
 
         final Set<ExecutionVertexID> verticesToRestart =
                 Stream.of(source, map, sink)
@@ -126,30 +135,40 @@ class VertexwiseSchedulingStrategyTest {
     void testOnExecutionStateChangeToFinished() {
         // trigger source1, source2 scheduled.
         final VertexwiseSchedulingStrategy schedulingStrategy =
-                startScheduling(testingSchedulingTopology);
+                createSchedulingStrategy(testingSchedulingTopology);
+
+        inputConsumableDecider.addSourceVertices(new HashSet<>(source));
+
+        schedulingStrategy.startScheduling();
         assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
 
         // trigger map1 scheduled
         final TestingSchedulingExecutionVertex source1 = source.get(0);
-        source1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        inputConsumableDecider.setInputConsumable(map.get(0));
         schedulingStrategy.onExecutionStateChange(source1.getId(), ExecutionState.FINISHED);
         assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3);
 
         // trigger map2 scheduled
         final TestingSchedulingExecutionVertex source2 = source.get(1);
-        source2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        inputConsumableDecider.setInputConsumable(map.get(1));
         schedulingStrategy.onExecutionStateChange(source2.getId(), ExecutionState.FINISHED);
         assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4);
 
-        // sinks' inputs are not all consumable yet so they are not scheduled
+        // sinks' inputs are not consumable yet so they are not scheduled
         final TestingSchedulingExecutionVertex map1 = map.get(0);
-        map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
         schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);
+        assertThat(
+                        inputConsumableDecider
+                                .getLastExecutionToDecideInputConsumable()
+                                .getId()
+                                .getJobVertexId())
+                .isEqualTo(sink.get(0).getId().getJobVertexId());
         assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4);
 
         // trigger sink1, sink2 scheduled
         final TestingSchedulingExecutionVertex map2 = map.get(1);
-        map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+        inputConsumableDecider.setInputConsumable(sink.get(0));
+        inputConsumableDecider.setInputConsumable(sink.get(1));
         schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);
         assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(6);
 
@@ -181,7 +200,9 @@ class VertexwiseSchedulingStrategyTest {
         final List<TestingSchedulingExecutionVertex> producers =
                 topology.addExecutionVertices().withParallelism(2).finish();
 
-        final VertexwiseSchedulingStrategy schedulingStrategy = startScheduling(topology);
+        final VertexwiseSchedulingStrategy schedulingStrategy = createSchedulingStrategy(topology);
+        inputConsumableDecider.addSourceVertices(new HashSet<>(producers));
+        schedulingStrategy.startScheduling();
 
         final List<TestingSchedulingExecutionVertex> consumers =
                 topology.addExecutionVertices().withParallelism(2).finish();
@@ -193,12 +214,10 @@ class VertexwiseSchedulingStrategyTest {
         // add consumers to scheduling strategy.
         if (allToAll) {
             topology.connectAllToAll(producers, consumers)
-                    .withResultPartitionState(ResultPartitionState.CONSUMABLE)
                     .withResultPartitionType(ResultPartitionType.BLOCKING)
                     .finish();
         } else {
             topology.connectPointwise(producers, consumers)
-                    .withResultPartitionState(ResultPartitionState.CONSUMABLE)
                     .withResultPartitionType(ResultPartitionType.BLOCKING)
                     .finish();
         }
@@ -209,6 +228,8 @@ class VertexwiseSchedulingStrategyTest {
                 consumers.stream()
                         .map(TestingSchedulingExecutionVertex::getId)
                         .collect(Collectors.toList()));
+        inputConsumableDecider.setInputConsumable(consumers.get(0));
+        inputConsumableDecider.setInputConsumable(consumers.get(1));
         schedulingStrategy.onExecutionStateChange(
                 producers.get(1).getId(), ExecutionState.FINISHED);
 
@@ -223,10 +244,10 @@ class VertexwiseSchedulingStrategyTest {
                 testingSchedulerOperation);
     }
 
-    VertexwiseSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
-        final VertexwiseSchedulingStrategy schedulingStrategy =
-                new VertexwiseSchedulingStrategy(testingSchedulerOperation, schedulingTopology);
-        schedulingStrategy.startScheduling();
-        return schedulingStrategy;
+    VertexwiseSchedulingStrategy createSchedulingStrategy(SchedulingTopology schedulingTopology) {
+        return new VertexwiseSchedulingStrategy(
+                testingSchedulerOperation,
+                schedulingTopology,
+                (ignore1, ignore2) -> inputConsumableDecider);
     }
 }