You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/15 02:16:34 UTC
[flink] 02/04: [FLINK-29767] Introduce InputConsumableDecider and using it to decide are input all consumable for VertexwiseSchedulingStrategy.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e739b3853a60b8c7584510b0becd87265306140e
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Oct 31 15:14:13 2022 +0800
[FLINK-29767] Introduce InputConsumableDecider and using it to decide are input all consumable for VertexwiseSchedulingStrategy.
---
.../strategy/DefaultInputConsumableDecider.java | 107 ++++++++++++
.../scheduler/strategy/InputConsumableDecider.java | 50 ++++++
.../strategy/VertexwiseSchedulingStrategy.java | 45 ++---
.../DefaultInputConsumableDeciderTest.java | 189 +++++++++++++++++++++
.../strategy/TestingInputConsumableDecider.java | 56 ++++++
.../strategy/VertexwiseSchedulingStrategyTest.java | 55 ++++--
6 files changed, 457 insertions(+), 45 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
new file mode 100644
index 00000000000..1308568db7f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Default implementation of {@link InputConsumableDecider}. This decider will judge whether the
+ * executionVertex's inputs are consumable as follows:
+ *
+ * <p>For blocking consumed partition group: Whether all result partitions in the group are
+ * finished.
+ *
+ * <p>For canBePipelined consumed partition group: whether all result partitions in the group are
+ * scheduled.
+ */
+class DefaultInputConsumableDecider implements InputConsumableDecider {
+ private final Function<IntermediateResultPartitionID, SchedulingResultPartition>
+ resultPartitionRetriever;
+
+ private final Function<ExecutionVertexID, Boolean> scheduledVertexRetriever;
+
+ DefaultInputConsumableDecider(
+ Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
+ Function<IntermediateResultPartitionID, SchedulingResultPartition>
+ resultPartitionRetriever) {
+ this.scheduledVertexRetriever = scheduledVertexRetriever;
+ this.resultPartitionRetriever = resultPartitionRetriever;
+ }
+
+ @Override
+ public boolean isInputConsumable(
+ SchedulingExecutionVertex executionVertex,
+ Set<ExecutionVertexID> verticesToSchedule,
+ Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ if (!consumableStatusCache.computeIfAbsent(
+ consumedPartitionGroup,
+ (group) -> isConsumedPartitionGroupConsumable(group, verticesToSchedule))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isConsumedPartitionGroupConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup,
+ final Set<ExecutionVertexID> verticesToSchedule) {
+ if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
+ for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
+ ExecutionVertexID producerVertex =
+ resultPartitionRetriever.apply(partitionId).getProducer().getId();
+ if (!verticesToSchedule.contains(producerVertex)
+ && !scheduledVertexRetriever.apply(producerVertex)) {
+ return false;
+ }
+ }
+ } else {
+ for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
+ if (resultPartitionRetriever.apply(partitionId).getState()
+ != ResultPartitionState.CONSUMABLE) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /** Factory for {@link DefaultInputConsumableDecider}. */
+ public static class Factory implements InputConsumableDecider.Factory {
+
+ public static final InputConsumableDecider.Factory INSTANCE = new Factory();
+
+ // disable public instantiation.
+ private Factory() {}
+
+ @Override
+ public InputConsumableDecider createInstance(
+ SchedulingTopology schedulingTopology,
+ Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) {
+ return new DefaultInputConsumableDecider(
+ scheduledVertexRetriever, schedulingTopology::getResultPartition);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
new file mode 100644
index 00000000000..e34cb06bc4e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * {@link InputConsumableDecider} is responsible for determining whether the input of an
+ * executionVertex is consumable.
+ */
+public interface InputConsumableDecider {
+ /**
+ * Determining whether the input of an execution vertex is consumable.
+ *
+ * @param executionVertex to be determined whether it's input is consumable.
+ * @param verticesToSchedule vertices that are not yet scheduled but already decided to be
+ * scheduled.
+ * @param consumableStatusCache a cache for {@link ConsumedPartitionGroup} consumable status.
+ * This is to avoid repetitive computation.
+ */
+ boolean isInputConsumable(
+ SchedulingExecutionVertex executionVertex,
+ Set<ExecutionVertexID> verticesToSchedule,
+ Map<ConsumedPartitionGroup, Boolean> consumableStatusCache);
+
+ /** Factory for {@link InputConsumableDecider}. */
+ interface Factory {
+ InputConsumableDecider createInstance(
+ SchedulingTopology schedulingTopology,
+ Function<ExecutionVertexID, Boolean> scheduledVertexRetriever);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
index 4d217a9eb97..74c47234a3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -51,12 +51,20 @@ public class VertexwiseSchedulingStrategy
private final Set<ExecutionVertexID> newVertices = new HashSet<>();
+ private final Set<ExecutionVertexID> scheduledVertices = new HashSet<>();
+
+ private final InputConsumableDecider inputConsumableDecider;
+
public VertexwiseSchedulingStrategy(
final SchedulerOperations schedulerOperations,
- final SchedulingTopology schedulingTopology) {
+ final SchedulingTopology schedulingTopology,
+ final InputConsumableDecider.Factory inputConsumableDeciderFactory) {
this.schedulerOperations = checkNotNull(schedulerOperations);
this.schedulingTopology = checkNotNull(schedulingTopology);
+ this.inputConsumableDecider =
+ inputConsumableDeciderFactory.createInstance(
+ schedulingTopology, scheduledVertices::contains);
schedulingTopology.registerSchedulingTopologyListener(this);
}
@@ -73,6 +81,7 @@ public class VertexwiseSchedulingStrategy
@Override
public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+ scheduledVertices.removeAll(verticesToRestart);
maybeScheduleVertices(verticesToRestart);
}
@@ -123,12 +132,13 @@ public class VertexwiseSchedulingStrategy
SchedulingExecutionVertex vertex =
schedulingTopology.getVertex(vertexId);
checkState(vertex.getState() == ExecutionState.CREATED);
- return areVertexInputsAllConsumable(
- vertex, consumableStatusCache);
+ return inputConsumableDecider.isInputConsumable(
+ vertex, Collections.emptySet(), consumableStatusCache);
})
.collect(Collectors.toSet());
scheduleVerticesOneByOne(verticesToDeploy);
+ scheduledVertices.addAll(verticesToDeploy);
}
private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToDeploy) {
@@ -143,37 +153,16 @@ public class VertexwiseSchedulingStrategy
id -> schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id)));
}
- private boolean areVertexInputsAllConsumable(
- SchedulingExecutionVertex vertex,
- Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
- for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
-
- if (!consumableStatusCache.computeIfAbsent(
- consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) {
- return false;
- }
- }
- return true;
- }
-
- private boolean isConsumedPartitionGroupConsumable(
- final ConsumedPartitionGroup consumedPartitionGroup) {
- for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
- if (schedulingTopology.getResultPartition(partitionId).getState()
- != ResultPartitionState.CONSUMABLE) {
- return false;
- }
- }
- return true;
- }
-
/** The factory for creating {@link VertexwiseSchedulingStrategy}. */
public static class Factory implements SchedulingStrategyFactory {
@Override
public SchedulingStrategy createInstance(
final SchedulerOperations schedulerOperations,
final SchedulingTopology schedulingTopology) {
- return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology);
+ return new VertexwiseSchedulingStrategy(
+ schedulerOperations,
+ schedulingTopology,
+ DefaultInputConsumableDecider.Factory.INSTANCE);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
new file mode 100644
index 00000000000..6b373e3c09d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultInputConsumableDecider}. */
+class DefaultInputConsumableDeciderTest {
+ @Test
+ void testNotFinishedBlockingInput() {
+ final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+ final List<TestingSchedulingExecutionVertex> producers =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ final List<TestingSchedulingExecutionVertex> consumer =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ topology.connectAllToAll(producers, consumer)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+
+ DefaultInputConsumableDecider inputConsumableDecider =
+ createDefaultInputConsumableDecider(Collections.emptySet(), topology);
+
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(0), Collections.emptySet(), new HashMap<>()))
+ .isFalse();
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(1), Collections.emptySet(), new HashMap<>()))
+ .isFalse();
+ }
+
+ @Test
+ void testAllFinishedBlockingInput() {
+ final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+ final List<TestingSchedulingExecutionVertex> producers =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ final List<TestingSchedulingExecutionVertex> consumer =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ topology.connectAllToAll(producers, consumer)
+ .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+
+ DefaultInputConsumableDecider inputConsumableDecider =
+ createDefaultInputConsumableDecider(Collections.emptySet(), topology);
+
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(0), Collections.emptySet(), new HashMap<>()))
+ .isTrue();
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(1), Collections.emptySet(), new HashMap<>()))
+ .isTrue();
+ }
+
+ @Test
+ void testUpstreamNotScheduledHybridInput() {
+ final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+ final List<TestingSchedulingExecutionVertex> producers =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ final List<TestingSchedulingExecutionVertex> consumer =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ topology.connectAllToAll(producers, consumer)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+ .finish();
+
+ DefaultInputConsumableDecider inputConsumableDecider =
+ createDefaultInputConsumableDecider(Collections.emptySet(), topology);
+
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(0), Collections.emptySet(), new HashMap<>()))
+ .isFalse();
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(1), Collections.emptySet(), new HashMap<>()))
+ .isFalse();
+ }
+
+ @Test
+ void testUpstreamAllScheduledHybridInput() {
+ final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+ final List<TestingSchedulingExecutionVertex> producers =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ final List<TestingSchedulingExecutionVertex> consumer =
+ topology.addExecutionVertices().withParallelism(2).finish();
+
+ topology.connectAllToAll(producers, consumer)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+ .finish();
+
+ HashSet<ExecutionVertexID> scheduledVertices = new HashSet<>();
+ DefaultInputConsumableDecider inputConsumableDecider =
+ createDefaultInputConsumableDecider(scheduledVertices, topology);
+ scheduledVertices.add(producers.get(0).getId());
+ HashSet<ExecutionVertexID> vertexToDeploy = new HashSet<>();
+ vertexToDeploy.add(producers.get(1).getId());
+
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(0), vertexToDeploy, new HashMap<>()))
+ .isTrue();
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(1), vertexToDeploy, new HashMap<>()))
+ .isTrue();
+ }
+
+ @Test
+ void testHybridAndBlockingInputButBlockingInputNotFinished() {
+ final TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+ final List<TestingSchedulingExecutionVertex> producers1 =
+ topology.addExecutionVertices().withParallelism(1).finish();
+
+ final List<TestingSchedulingExecutionVertex> producers2 =
+ topology.addExecutionVertices().withParallelism(1).finish();
+
+ final List<TestingSchedulingExecutionVertex> consumer =
+ topology.addExecutionVertices().withParallelism(1).finish();
+
+ topology.connectAllToAll(producers1, consumer)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.BLOCKING)
+ .finish();
+
+ topology.connectAllToAll(producers2, consumer)
+ .withResultPartitionState(ResultPartitionState.CREATED)
+ .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+ .finish();
+
+ DefaultInputConsumableDecider inputConsumableDecider =
+ createDefaultInputConsumableDecider(
+ Collections.singleton(producers2.get(0).getId()), topology);
+
+ assertThat(
+ inputConsumableDecider.isInputConsumable(
+ consumer.get(0), Collections.emptySet(), new HashMap<>()))
+ .isFalse();
+ }
+
+ private DefaultInputConsumableDecider createDefaultInputConsumableDecider(
+ Set<ExecutionVertexID> scheduledVertices, SchedulingTopology schedulingTopology) {
+ return new DefaultInputConsumableDecider(
+ scheduledVertices::contains, schedulingTopology::getResultPartition);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
new file mode 100644
index 00000000000..da2aa8241ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Mock {@link InputConsumableDecider} for testing. */
+public class TestingInputConsumableDecider implements InputConsumableDecider {
+
+ private final Set<SchedulingExecutionVertex> inputConsumableExecutionVertices = new HashSet<>();
+
+ private final Set<SchedulingExecutionVertex> sourceVertices = new HashSet<>();
+
+ private SchedulingExecutionVertex lastExecutionToDecideInputConsumable;
+
+ @Override
+ public boolean isInputConsumable(
+ SchedulingExecutionVertex executionVertex,
+ Set<ExecutionVertexID> verticesToDeploy,
+ Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ lastExecutionToDecideInputConsumable = executionVertex;
+ return sourceVertices.contains(executionVertex)
+ || inputConsumableExecutionVertices.contains(executionVertex);
+ }
+
+ public void setInputConsumable(SchedulingExecutionVertex executionVertex) {
+ inputConsumableExecutionVertices.add(executionVertex);
+ }
+
+ public void addSourceVertices(Collection<SchedulingExecutionVertex> sourceVertices) {
+ this.sourceVertices.addAll(sourceVertices);
+ }
+
+ public SchedulingExecutionVertex getLastExecutionToDecideInputConsumable() {
+ return lastExecutionToDecideInputConsumable;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
index 86cdd6a4f21..6b679b73e29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -44,6 +45,8 @@ class VertexwiseSchedulingStrategyTest {
private TestingSchedulingTopology testingSchedulingTopology;
+ private TestingInputConsumableDecider inputConsumableDecider;
+
private List<TestingSchedulingExecutionVertex> source;
private List<TestingSchedulingExecutionVertex> map;
@@ -53,7 +56,7 @@ class VertexwiseSchedulingStrategyTest {
@BeforeEach
void setUp() {
testingSchedulerOperation = new TestingSchedulerOperations();
-
+ inputConsumableDecider = new TestingInputConsumableDecider();
buildTopology();
}
@@ -90,13 +93,17 @@ class VertexwiseSchedulingStrategyTest {
@Test
void testStartScheduling() {
- startScheduling(testingSchedulingTopology);
+ VertexwiseSchedulingStrategy schedulingStrategy =
+ createSchedulingStrategy(testingSchedulingTopology);
final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices =
new ArrayList<>();
expectedScheduledVertices.add(Collections.singletonList(source.get(0)));
expectedScheduledVertices.add(Collections.singletonList(source.get(1)));
+ inputConsumableDecider.addSourceVertices(new HashSet<>(source));
+
+ schedulingStrategy.startScheduling();
assertLatestScheduledVerticesAreEqualTo(
expectedScheduledVertices, testingSchedulerOperation);
}
@@ -104,7 +111,9 @@ class VertexwiseSchedulingStrategyTest {
@Test
void testRestartTasks() {
final VertexwiseSchedulingStrategy schedulingStrategy =
- startScheduling(testingSchedulingTopology);
+ createSchedulingStrategy(testingSchedulingTopology);
+
+ inputConsumableDecider.addSourceVertices(new HashSet<>(source));
final Set<ExecutionVertexID> verticesToRestart =
Stream.of(source, map, sink)
@@ -126,30 +135,40 @@ class VertexwiseSchedulingStrategyTest {
void testOnExecutionStateChangeToFinished() {
// trigger source1, source2 scheduled.
final VertexwiseSchedulingStrategy schedulingStrategy =
- startScheduling(testingSchedulingTopology);
+ createSchedulingStrategy(testingSchedulingTopology);
+
+ inputConsumableDecider.addSourceVertices(new HashSet<>(source));
+
+ schedulingStrategy.startScheduling();
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
// trigger map1 scheduled
final TestingSchedulingExecutionVertex source1 = source.get(0);
- source1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ inputConsumableDecider.setInputConsumable(map.get(0));
schedulingStrategy.onExecutionStateChange(source1.getId(), ExecutionState.FINISHED);
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3);
// trigger map2 scheduled
final TestingSchedulingExecutionVertex source2 = source.get(1);
- source2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ inputConsumableDecider.setInputConsumable(map.get(1));
schedulingStrategy.onExecutionStateChange(source2.getId(), ExecutionState.FINISHED);
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4);
- // sinks' inputs are not all consumable yet so they are not scheduled
+ // sinks' inputs are not consumable yet so they are not scheduled
final TestingSchedulingExecutionVertex map1 = map.get(0);
- map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);
+ assertThat(
+ inputConsumableDecider
+ .getLastExecutionToDecideInputConsumable()
+ .getId()
+ .getJobVertexId())
+ .isEqualTo(sink.get(0).getId().getJobVertexId());
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4);
// trigger sink1, sink2 scheduled
final TestingSchedulingExecutionVertex map2 = map.get(1);
- map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
+ inputConsumableDecider.setInputConsumable(sink.get(0));
+ inputConsumableDecider.setInputConsumable(sink.get(1));
schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(6);
@@ -181,7 +200,9 @@ class VertexwiseSchedulingStrategyTest {
final List<TestingSchedulingExecutionVertex> producers =
topology.addExecutionVertices().withParallelism(2).finish();
- final VertexwiseSchedulingStrategy schedulingStrategy = startScheduling(topology);
+ final VertexwiseSchedulingStrategy schedulingStrategy = createSchedulingStrategy(topology);
+ inputConsumableDecider.addSourceVertices(new HashSet<>(producers));
+ schedulingStrategy.startScheduling();
final List<TestingSchedulingExecutionVertex> consumers =
topology.addExecutionVertices().withParallelism(2).finish();
@@ -193,12 +214,10 @@ class VertexwiseSchedulingStrategyTest {
// add consumers to scheduling strategy.
if (allToAll) {
topology.connectAllToAll(producers, consumers)
- .withResultPartitionState(ResultPartitionState.CONSUMABLE)
.withResultPartitionType(ResultPartitionType.BLOCKING)
.finish();
} else {
topology.connectPointwise(producers, consumers)
- .withResultPartitionState(ResultPartitionState.CONSUMABLE)
.withResultPartitionType(ResultPartitionType.BLOCKING)
.finish();
}
@@ -209,6 +228,8 @@ class VertexwiseSchedulingStrategyTest {
consumers.stream()
.map(TestingSchedulingExecutionVertex::getId)
.collect(Collectors.toList()));
+ inputConsumableDecider.setInputConsumable(consumers.get(0));
+ inputConsumableDecider.setInputConsumable(consumers.get(1));
schedulingStrategy.onExecutionStateChange(
producers.get(1).getId(), ExecutionState.FINISHED);
@@ -223,10 +244,10 @@ class VertexwiseSchedulingStrategyTest {
testingSchedulerOperation);
}
- VertexwiseSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
- final VertexwiseSchedulingStrategy schedulingStrategy =
- new VertexwiseSchedulingStrategy(testingSchedulerOperation, schedulingTopology);
- schedulingStrategy.startScheduling();
- return schedulingStrategy;
+ VertexwiseSchedulingStrategy createSchedulingStrategy(SchedulingTopology schedulingTopology) {
+ return new VertexwiseSchedulingStrategy(
+ testingSchedulerOperation,
+ schedulingTopology,
+ (ignore1, ignore2) -> inputConsumableDecider);
}
}