You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2018/06/29 04:25:42 UTC
[incubator-nemo] branch master updated: [NEMO-73,
75] SchedulingPolicy as Vertex-level Execution Property (#57)
This is an automated email from the ASF dual-hosted git repository.
wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 209369c [NEMO-73,75] SchedulingPolicy as Vertex-level Execution Property (#57)
209369c is described below
commit 209369ce9bf4980ef4abccaa845fde0b33304af0
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Fri Jun 29 13:25:40 2018 +0900
[NEMO-73,75] SchedulingPolicy as Vertex-level Execution Property (#57)
JIRA: [NEMO-73: SchedulingPolicy as Vertex-level Execution Property](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-73)
JIRA: [NEMO-75: Rename SchedulingPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-75)
**Major changes:**
- Separated SchedulingConstraint from SchedulingPolicy.
- Added SchedulingConstraintRegistry.
- Modified SchedulerRunner to find SchedulingConstraints to apply on the task to schedule.
- Removed CompositeSchedulingPolicy
**Minor changes to note:**
- Added ExecutorSlotCompliance{Property,Pass} and SourceLocationAwareScheduling{Property,Pass}
- Renamed ScheduleGroupIndex to ScheduleGroup
- MinOccupancySchedulingPolicy is now default SchedulingPolicy.
**Tests for the changes:**
- Added SchedulingConstraintRegistryTest
**Other comments:**
- FaultToleranceTest is expected to revamped in changes for NEMO-50.
resolves [NEMO-73](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-73)
resolves [NEMO-75](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-75)
---
.../ir/executionproperty/AssociatedProperty.java | 28 ++
.../ExecutorSlotComplianceProperty.java | 46 +++
...dexProperty.java => ScheduleGroupProperty.java} | 10 +-
.../SourceLocationAwareSchedulingProperty.java | 47 +++
.../annotating/DefaultScheduleGroupPass.java | 40 +--
.../annotating/ExecutorSlotCompliancePass.java | 40 +++
.../SourceLocationAwareSchedulingPass.java | 40 +++
.../composite/PrimitiveCompositePass.java | 4 +-
.../runtime/common/plan/PhysicalPlanGenerator.java | 50 +--
.../edu/snu/nemo/runtime/common/plan/Stage.java | 12 +-
.../executor/datatransfer/DataTransferTest.java | 9 +-
.../master/scheduler/BatchSingleJobScheduler.java | 30 +-
.../scheduler/CompositeSchedulingPolicy.java | 56 ---
...=> ContainerTypeAwareSchedulingConstraint.java} | 30 +-
...licy.java => FreeSlotSchedulingConstraint.java} | 27 +-
.../MinOccupancyFirstSchedulingPolicy.java | 25 +-
.../runtime/master/scheduler/SchedulerRunner.java | 30 +-
...dulingPolicy.java => SchedulingConstraint.java} | 10 +-
.../scheduler/SchedulingConstraintRegistry.java | 74 ++++
.../runtime/master/scheduler/SchedulingPolicy.java | 19 +-
...> SourceLocationAwareSchedulingConstraint.java} | 32 +-
.../scheduler/BatchSingleJobSchedulerTest.java | 16 +-
...ontainerTypeAwareSchedulingConstraintTest.java} | 19 +-
.../master/scheduler/FaultToleranceTest.java | 379 ---------------------
....java => FreeSlotSchedulingConstraintTest.java} | 16 +-
.../MinOccupancyFirstSchedulingPolicyTest.java | 10 +-
.../master/scheduler/SchedulerTestUtil.java | 22 --
...urceLocationAwareSchedulingConstraintTest.java} | 24 +-
.../common/plan/PhysicalPlanGeneratorTest.java | 8 +-
.../runtime/common/plan/StagePartitionerTest.java | 14 +-
.../SchedulingConstraintnRegistryTest.java | 50 +++
.../annotating/DefaultScheduleGroupPassTest.java | 64 ++--
.../optimizer/policy/PolicyBuilderTest.java | 6 +-
33 files changed, 568 insertions(+), 719 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
new file mode 100644
index 0000000..ceebd86
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.executionproperty;
+
+import java.lang.annotation.*;
+
+/**
+ * Declares associated {@link ExecutionProperty} for implementations.
+ */
+@Target({ElementType.TYPE})
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+public @interface AssociatedProperty {
+ Class<? extends ExecutionProperty> value();
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
new file mode 100644
index 0000000..357be21
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to comply to slot restrictions when scheduling this vertex.
+ */
+public final class ExecutorSlotComplianceProperty extends VertexExecutionProperty<Boolean> {
+ private static final ExecutorSlotComplianceProperty COMPLIANCE_TRUE = new ExecutorSlotComplianceProperty(true);
+ private static final ExecutorSlotComplianceProperty COMPLIANCE_FALSE
+ = new ExecutorSlotComplianceProperty(false);
+
+ /**
+ * Default constructor.
+ *
+ * @param value value of the ExecutionProperty
+ */
+ private ExecutorSlotComplianceProperty(final boolean value) {
+ super(value);
+ }
+
+ /**
+ * Static method getting execution property.
+ *
+ * @param value value of the new execution property
+ * @return the execution property
+ */
+ public static ExecutorSlotComplianceProperty of(final boolean value) {
+ return value ? COMPLIANCE_TRUE : COMPLIANCE_FALSE;
+ }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
similarity index 76%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
index 08518ff..3b521cb 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
@@ -18,14 +18,14 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
/**
- * ScheduleGroupIndex ExecutionProperty.
+ * ScheduleGroup ExecutionProperty.
*/
-public final class ScheduleGroupIndexProperty extends VertexExecutionProperty<Integer> {
+public final class ScheduleGroupProperty extends VertexExecutionProperty<Integer> {
/**
* Constructor.
* @param value value of the execution property.
*/
- private ScheduleGroupIndexProperty(final Integer value) {
+ private ScheduleGroupProperty(final Integer value) {
super(value);
}
@@ -34,7 +34,7 @@ public final class ScheduleGroupIndexProperty extends VertexExecutionProperty<In
* @param value value of the new execution property.
* @return the newly created execution property.
*/
- public static ScheduleGroupIndexProperty of(final Integer value) {
- return new ScheduleGroupIndexProperty(value);
+ public static ScheduleGroupProperty of(final Integer value) {
+ return new ScheduleGroupProperty(value);
}
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
new file mode 100644
index 0000000..cad987d
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to schedule this vertex only on executors where source data reside.
+ */
+public final class SourceLocationAwareSchedulingProperty extends VertexExecutionProperty<Boolean> {
+ private static final SourceLocationAwareSchedulingProperty SOURCE_TRUE
+ = new SourceLocationAwareSchedulingProperty(true);
+ private static final SourceLocationAwareSchedulingProperty SOURCE_FALSE
+ = new SourceLocationAwareSchedulingProperty(false);
+
+ /**
+ * Default constructor.
+ *
+ * @param value value of the ExecutionProperty
+ */
+ private SourceLocationAwareSchedulingProperty(final boolean value) {
+ super(value);
+ }
+
+ /**
+ * Static method getting execution property.
+ *
+ * @param value value of the new execution property
+ * @return the execution property
+ */
+ public static SourceLocationAwareSchedulingProperty of(final boolean value) {
+ return value ? SOURCE_TRUE : SOURCE_FALSE;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 38f964c..ca788bd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import org.apache.commons.lang3.mutable.MutableInt;
import java.util.*;
@@ -71,7 +71,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup,
final boolean allowShuffleWithinScheduleGroup,
final boolean allowMultipleInEdgesWithinScheduleGroup) {
- super(ScheduleGroupIndexProperty.class, Stream.of(
+ super(ScheduleGroupProperty.class, Stream.of(
DataCommunicationPatternProperty.class,
DataFlowModelProperty.class
).collect(Collectors.toSet()));
@@ -93,7 +93,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
newScheduleGroup.vertices.add(irVertex);
irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup);
}
- // Get scheduleGroupIndex
+ // Get scheduleGroup
final ScheduleGroup scheduleGroup = irVertexToScheduleGroupMap.get(irVertex);
if (scheduleGroup == null) {
throw new RuntimeException(String.format("ScheduleGroup must be set for %s", irVertex));
@@ -203,42 +203,42 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
}
});
- // Assign ScheduleGroupIndex property based on topology of ScheduleGroups
- final MutableInt currentScheduleGroupIndex = new MutableInt(getNextScheudleGroupIndex(dag.getVertices()));
+ // Assign ScheduleGroup property based on topology of ScheduleGroups
+ final MutableInt currentScheduleGroup = new MutableInt(getNextScheudleGroup(dag.getVertices()));
final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder = new DAGBuilder<>();
scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex);
scheduleGroups.forEach(src -> src.scheduleGroupsTo
.forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new ScheduleGroupEdge(src, dst))));
scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> {
- boolean usedCurrentIndex = false;
+ boolean usedCurrentScheduleGroup = false;
for (final IRVertex irVertex : scheduleGroup.vertices) {
- if (!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) {
- irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue()));
- usedCurrentIndex = true;
+ if (!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
+ irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
+ usedCurrentScheduleGroup = true;
}
}
- if (usedCurrentIndex) {
- currentScheduleGroupIndex.increment();
+ if (usedCurrentScheduleGroup) {
+ currentScheduleGroup.increment();
}
});
return dag;
}
/**
- * Determines the range of {@link ScheduleGroupIndexProperty} value that will prevent collision
- * with the existing {@link ScheduleGroupIndexProperty}.
+ * Determines the range of {@link ScheduleGroupProperty} value that will prevent collision
+ * with the existing {@link ScheduleGroupProperty}.
* @param irVertexCollection collection of {@link IRVertex}
- * @return the minimum value for the {@link ScheduleGroupIndexProperty} that won't collide with the existing values
+ * @return the minimum value for the {@link ScheduleGroupProperty} that won't collide with the existing values
*/
- private int getNextScheudleGroupIndex(final Collection<IRVertex> irVertexCollection) {
- int nextScheduleGroupIndex = 0;
+ private int getNextScheudleGroup(final Collection<IRVertex> irVertexCollection) {
+ int nextScheduleGroup = 0;
for (final IRVertex irVertex : irVertexCollection) {
- final Optional<Integer> scheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class);
- if (scheduleGroupIndex.isPresent()) {
- nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, nextScheduleGroupIndex);
+ final Optional<Integer> scheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class);
+ if (scheduleGroup.isPresent()) {
+ nextScheduleGroup = Math.max(scheduleGroup.get() + 1, nextScheduleGroup);
}
}
- return nextScheduleGroupIndex;
+ return nextScheduleGroup;
}
/**
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
new file mode 100644
index 0000000..e1b2123
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+
+/**
+ * Sets {@link ExecutorSlotComplianceProperty}.
+ */
+public final class ExecutorSlotCompliancePass extends AnnotatingPass {
+
+ public ExecutorSlotCompliancePass() {
+ super(ExecutorSlotComplianceProperty.class);
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ // On every vertex, if ExecutorSlotComplianceProperty is not set, put it as true.
+ dag.getVertices().stream()
+ .filter(v -> !v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class))
+ .forEach(v -> v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true)));
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
new file mode 100644
index 0000000..8c30d95
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
+
+/**
+ * Sets {@link SourceLocationAwareSchedulingProperty}.
+ */
+public final class SourceLocationAwareSchedulingPass extends AnnotatingPass {
+
+ public SourceLocationAwareSchedulingPass() {
+ super(SourceLocationAwareSchedulingProperty.class);
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ // On every vertex, if SourceLocationAwareSchedulingProperty is not set, put it as true.
+ dag.getVertices().stream()
+ .filter(v -> !v.getExecutionProperties().containsKey(SourceLocationAwareSchedulingProperty.class))
+ .forEach(v -> v.getExecutionProperties().put(SourceLocationAwareSchedulingProperty.of(true)));
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index 3a5c80c..a30abec 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -37,7 +37,9 @@ public final class PrimitiveCompositePass extends CompositePass {
new DefaultEdgeUsedDataHandlingPass(),
new DefaultScheduleGroupPass(),
new CompressionPass(),
- new DecompressionPass()
+ new DecompressionPass(),
+ new SourceLocationAwareSchedulingPass(),
+ new ExecutorSlotCompliancePass()
));
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 107a623..b9b9795 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.*;
import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
@@ -226,8 +226,8 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
private void integrityCheck(final Stage stage) {
stage.getPropertyValue(ParallelismProperty.class)
.orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
- stage.getPropertyValue(ScheduleGroupIndexProperty.class)
- .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
+ stage.getPropertyValue(ScheduleGroupProperty.class)
+ .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage"));
stage.getIRDAG().getVertices().forEach(irVertex -> {
// Check vertex type.
@@ -241,24 +241,24 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
/**
* Split ScheduleGroups by Pull {@link StageEdge}s, and ensure topological ordering of
- * {@link ScheduleGroupIndexProperty}.
+ * {@link ScheduleGroupProperty}.
*
* @param dag {@link DAG} of {@link Stage}s to manipulate
*/
private void splitScheduleGroupByPullStageEdges(final DAG<Stage, StageEdge> dag) {
- final MutableInt nextScheduleGroupIndex = new MutableInt(0);
- final Map<Stage, Integer> stageToScheduleGroupIndexMap = new HashMap<>();
+ final MutableInt nextScheduleGroup = new MutableInt(0);
+ final Map<Stage, Integer> stageToScheduleGroupMap = new HashMap<>();
dag.topologicalDo(currentStage -> {
- // Base case: assign New ScheduleGroupIndex of the Stage
- stageToScheduleGroupIndexMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroupIndex));
+ // Base case: assign New ScheduleGroup of the Stage
+ stageToScheduleGroupMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroup));
for (final StageEdge stageEdgeFromCurrentStage : dag.getOutgoingEdgesOf(currentStage)) {
final Stage destination = stageEdgeFromCurrentStage.getDst();
- // Skip if some Stages that destination depends on do not have assigned new ScheduleGroupIndex
+ // Skip if some Stages that destination depends on do not have assigned new ScheduleGroup
boolean skip = false;
for (final StageEdge stageEdgeToDestination : dag.getIncomingEdgesOf(destination)) {
- if (!stageToScheduleGroupIndexMap.containsKey(stageEdgeToDestination.getSrc())) {
+ if (!stageToScheduleGroupMap.containsKey(stageEdgeToDestination.getSrc())) {
skip = true;
break;
}
@@ -266,42 +266,42 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
if (skip) {
continue;
}
- if (stageToScheduleGroupIndexMap.containsKey(destination)) {
+ if (stageToScheduleGroupMap.containsKey(destination)) {
continue;
}
// Find any non-pull inEdge
- Integer scheduleGroupIndex = null;
- Integer newScheduleGroupIndex = null;
+ Integer scheduleGroup = null;
+ Integer newScheduleGroup = null;
for (final StageEdge stageEdge : dag.getIncomingEdgesOf(destination)) {
final Stage source = stageEdge.getSrc();
if (stageEdge.getDataFlowModel() != DataFlowModelProperty.Value.Pull) {
- if (scheduleGroupIndex != null && source.getScheduleGroupIndex() != scheduleGroupIndex) {
+ if (scheduleGroup != null && source.getScheduleGroup() != scheduleGroup) {
throw new RuntimeException(String.format("Multiple Push inEdges from different ScheduleGroup: %d, %d",
- scheduleGroupIndex, source.getScheduleGroupIndex()));
+ scheduleGroup, source.getScheduleGroup()));
}
- if (source.getScheduleGroupIndex() != destination.getScheduleGroupIndex()) {
+ if (source.getScheduleGroup() != destination.getScheduleGroup()) {
throw new RuntimeException(String.format("Split ScheduleGroup by push StageEdge: %d, %d",
- source.getScheduleGroupIndex(), destination.getScheduleGroupIndex()));
+ source.getScheduleGroup(), destination.getScheduleGroup()));
}
- scheduleGroupIndex = source.getScheduleGroupIndex();
- newScheduleGroupIndex = stageToScheduleGroupIndexMap.get(source);
+ scheduleGroup = source.getScheduleGroup();
+ newScheduleGroup = stageToScheduleGroupMap.get(source);
}
}
- if (newScheduleGroupIndex == null) {
- stageToScheduleGroupIndexMap.put(destination, getAndIncrement(nextScheduleGroupIndex));
+ if (newScheduleGroup == null) {
+ stageToScheduleGroupMap.put(destination, getAndIncrement(nextScheduleGroup));
} else {
- stageToScheduleGroupIndexMap.put(destination, newScheduleGroupIndex);
+ stageToScheduleGroupMap.put(destination, newScheduleGroup);
}
}
});
dag.topologicalDo(stage -> {
- final int scheduleGroupIndex = stageToScheduleGroupIndexMap.get(stage);
- stage.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+ final int scheduleGroup = stageToScheduleGroupMap.get(stage);
+ stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties()
- .put(ScheduleGroupIndexProperty.of(scheduleGroupIndex)));
+ .put(ScheduleGroupProperty.of(scheduleGroup)));
});
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index 9e5da38..c2abcc4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -22,7 +22,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import org.apache.commons.lang3.SerializationUtils;
@@ -94,11 +94,11 @@ public final class Stage extends Vertex {
}
/**
- * @return the schedule group index.
+ * @return the schedule group.
*/
- public int getScheduleGroupIndex() {
- return executionProperties.get(ScheduleGroupIndexProperty.class)
- .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
+ public int getScheduleGroup() {
+ return executionProperties.get(ScheduleGroupProperty.class)
+ .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage"));
}
/**
@@ -130,7 +130,7 @@ public final class Stage extends Vertex {
@Override
public String propertiesToJSON() {
final StringBuilder sb = new StringBuilder();
- sb.append("{\"scheduleGroupIndex\": ").append(getScheduleGroupIndex());
+ sb.append("{\"scheduleGroup\": ").append(getScheduleGroup());
sb.append(", \"irDag\": ").append(irDag);
sb.append(", \"parallelism\": ").append(getParallelism());
sb.append(", \"executionProperties\": ").append(executionProperties);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 846bcca..c2919a3 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import edu.snu.nemo.common.test.EmptyComponents;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.Pair;
@@ -131,9 +131,10 @@ public final class DataTransferTest {
final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
- final SchedulingPolicy schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
+ final SchedulingConstraintRegistry schedulingConstraint = injector.getInstance(SchedulingConstraintRegistry.class);
+ final SchedulingPolicy schedulingPolicy = injector.getInstance(SchedulingPolicy.class);
final PendingTaskCollectionPointer taskQueue = new PendingTaskCollectionPointer();
- final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
+ final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingConstraint, schedulingPolicy, taskQueue, executorRegistry);
final Scheduler scheduler = new BatchSingleJobScheduler(
schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
final AtomicInteger executorCount = new AtomicInteger(0);
@@ -549,7 +550,7 @@ public final class DataTransferTest {
final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty = new ExecutionPropertyMap<>(stageId);
stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN));
- stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0));
+ stageExecutionProperty.put(ScheduleGroupProperty.of(0));
return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 4bd8d4f..4009424 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -114,7 +114,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
LOG.info("Job to schedule: {}", physicalPlanOfJob.getId());
this.initialScheduleGroup = physicalPlanOfJob.getStageDAG().getVertices().stream()
- .mapToInt(stage -> stage.getScheduleGroupIndex())
+ .mapToInt(stage -> stage.getScheduleGroup())
.min().getAsInt();
scheduleNextScheduleGroup(initialScheduleGroup);
@@ -213,7 +213,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
// Schedule a stage after marking the necessary tasks to failed_recoverable.
// The stage for one of the tasks that failed is a starting point to look
// for the next stage to be scheduled.
- scheduleNextScheduleGroup(getSchedulingIndexOfStage(
+ scheduleNextScheduleGroup(getScheduleGroupOfStage(
RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next())));
}
}
@@ -245,7 +245,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
/**
* Selects the next stage to schedule.
- * It takes the referenceScheduleGroupIndex as a reference point to begin looking for the stages to execute:
+ * It takes the referenceScheduleGroup as a reference point to begin looking for the stages to execute:
*
* a) returns the failed_recoverable stage(s) of the earliest schedule group, if it(they) exists.
* b) returns an empty optional if there are no schedulable stages at the moment.
@@ -253,15 +253,15 @@ public final class BatchSingleJobScheduler implements Scheduler {
* - if an ancestor schedule group is still executing
* c) returns the next set of schedulable stages (if the current schedule group has completed execution)
*
- * @param referenceScheduleGroupIndex
+ * @param referenceScheduleGroup
* the index of the schedule group that is executing/has executed when this method is called.
* @return an optional of the (possibly empty) next schedulable stage
*/
- private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroupIndex) {
+ private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroup) {
// Recursively check the previous schedule group.
- if (referenceScheduleGroupIndex > initialScheduleGroup) {
+ if (referenceScheduleGroup > initialScheduleGroup) {
final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
- selectNextScheduleGroupToSchedule(referenceScheduleGroupIndex - 1);
+ selectNextScheduleGroupToSchedule(referenceScheduleGroup - 1);
if (ancestorStagesFromAScheduleGroup.isPresent()) {
// Nothing to schedule from the previous schedule group.
return ancestorStagesFromAScheduleGroup;
@@ -277,7 +277,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
// All previous schedule groups are complete, we need to check for the current schedule group.
final List<Stage> currentScheduleGroup = reverseTopoStages
.stream()
- .filter(stage -> stage.getScheduleGroupIndex() == referenceScheduleGroupIndex)
+ .filter(stage -> stage.getScheduleGroup() == referenceScheduleGroup)
.collect(Collectors.toList());
final boolean allStagesOfThisGroupComplete = currentScheduleGroup
.stream()
@@ -286,7 +286,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
.allMatch(state -> state.equals(StageState.State.COMPLETE));
if (!allStagesOfThisGroupComplete) {
- LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroupIndex);
+ LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroup);
final List<Stage> stagesToSchedule = currentScheduleGroup
.stream()
.filter(stage -> {
@@ -304,7 +304,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
final List<Stage> stagesToSchedule = reverseTopoStages
.stream()
.filter(stage -> {
- if (stage.getScheduleGroupIndex() == referenceScheduleGroupIndex + 1) {
+ if (stage.getScheduleGroup() == referenceScheduleGroup + 1) {
final String stageId = stage.getId();
return jobStateManager.getStageState(stageId) != StageState.State.EXECUTING
&& jobStateManager.getStageState(stageId) != StageState.State.COMPLETE;
@@ -314,7 +314,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
.collect(Collectors.toList());
if (stagesToSchedule.isEmpty()) {
- LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroupIndex + 1);
+ LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroup + 1);
return Optional.empty();
}
@@ -433,7 +433,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
// if the stage this task belongs to is complete,
if (!jobStateManager.isJobDone()) {
- scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageIdForTaskUponCompletion));
+ scheduleNextScheduleGroup(getScheduleGroupOfStage(stageIdForTaskUponCompletion));
}
}
schedulerRunner.onAnExecutorAvailable();
@@ -502,7 +502,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
// TODO #50: Carefully retry tasks in the scheduler
case OUTPUT_WRITE_FAILURE:
blockManagerMaster.onProducerTaskFailed(taskId);
- scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageId));
+ scheduleNextScheduleGroup(getScheduleGroupOfStage(stageId));
break;
case CONTAINER_FAILURE:
LOG.info("Only the failed task will be retried.");
@@ -513,7 +513,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
schedulerRunner.onAnExecutorAvailable();
}
- private int getSchedulingIndexOfStage(final String stageId) {
- return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroupIndex();
+ private int getScheduleGroupOfStage(final String stageId) {
+ return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroup();
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
deleted file mode 100644
index 213f5f1..0000000
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.plan.Task;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-
-import javax.inject.Inject;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Temporary class to implement stacked scheduling policy.
- * At now, policies are injected through Tang, but have to be configurable by users
- * when Nemo supports job-wide execution property.
- * TODO #69: Support job-wide execution property.
- */
-public final class CompositeSchedulingPolicy implements SchedulingPolicy {
- private final List<SchedulingPolicy> schedulingPolicies;
-
- @Inject
- private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy sourceLocationAwareSchedulingPolicy,
- final MinOccupancyFirstSchedulingPolicy minOccupancyFirstSchedulingPolicy,
- final FreeSlotSchedulingPolicy freeSlotSchedulingPolicy,
- final ContainerTypeAwareSchedulingPolicy containerTypeAwareSchedulingPolicy) {
- schedulingPolicies = Arrays.asList(
- freeSlotSchedulingPolicy,
- containerTypeAwareSchedulingPolicy,
- sourceLocationAwareSchedulingPolicy,
- minOccupancyFirstSchedulingPolicy);
- }
-
- @Override
- public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task) {
- Set<ExecutorRepresenter> candidates = executorRepresenterSet;
- for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
- candidates = schedulingPolicy.filterExecutorRepresenters(candidates, task);
- }
- return candidates;
- }
-}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
similarity index 52%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
index 5f64e9c..a91e996 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
@@ -16,45 +16,29 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* This policy find executors which has corresponding container type.
*/
-public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolicy {
+@AssociatedProperty(ExecutorPlacementProperty.class)
+public final class ContainerTypeAwareSchedulingConstraint implements SchedulingConstraint {
@VisibleForTesting
@Inject
- public ContainerTypeAwareSchedulingPolicy() {
+ public ContainerTypeAwareSchedulingConstraint() {
}
- /**
- * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
- * If the container type of target Task is NONE, it will return the original set.
- * @param task {@link Task} to be scheduled.
- * @return filtered Set of {@link ExecutorRepresenter}.
- */
@Override
- public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task) {
-
+ public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
final String executorPlacementPropertyValue = task.getPropertyValue(ExecutorPlacementProperty.class)
.orElse(ExecutorPlacementProperty.NONE);
- if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) {
- return executorRepresenterSet;
- }
-
- final Set<ExecutorRepresenter> candidateExecutors =
- executorRepresenterSet.stream()
- .filter(executor -> executor.getContainerType().equals(executorPlacementPropertyValue))
- .collect(Collectors.toSet());
-
- return candidateExecutors;
+ return executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE) ? true
+ : executor.getContainerType().equals(executorPlacementPropertyValue);
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
similarity index 50%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index d92d9d4..1fc1f6e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -16,36 +16,29 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* This policy finds executor that has free slot for a Task.
*/
-public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
+@AssociatedProperty(ExecutorSlotComplianceProperty.class)
+public final class FreeSlotSchedulingConstraint implements SchedulingConstraint {
@VisibleForTesting
@Inject
- public FreeSlotSchedulingPolicy() {
+ public FreeSlotSchedulingConstraint() {
}
- /**
- * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
- * Executors that do not have any free slots will be filtered by this policy.
- * @param task {@link Task} to be scheduled.
- * @return filtered Set of {@link ExecutorRepresenter}.
- */
@Override
- public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task) {
- final Set<ExecutorRepresenter> candidateExecutors =
- executorRepresenterSet.stream()
- .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
- .collect(Collectors.toSet());
+ public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
+ if (!task.getPropertyValue(ExecutorSlotComplianceProperty.class).orElse(false)) {
+ return true;
+ }
- return candidateExecutors;
+ return executor.getRunningTasks().size() < executor.getExecutorCapacity();
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index 120e1fb..e53f659 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -26,12 +26,7 @@ import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.stream.Collectors;
-
/**
- * {@inheritDoc}
- * A scheduling policy used by {@link BatchSingleJobScheduler}.
- *
* This policy chooses a set of Executors, on which have minimum running Tasks.
*/
@ThreadSafe
@@ -44,28 +39,20 @@ public final class MinOccupancyFirstSchedulingPolicy implements SchedulingPolicy
public MinOccupancyFirstSchedulingPolicy() {
}
- /**
- * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the occupancy of the Executors.
- * @param task {@link Task} to be scheduled.
- * @return filtered Set of {@link ExecutorRepresenter}.
- */
@Override
- public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task) {
+ public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) {
final OptionalInt minOccupancy =
- executorRepresenterSet.stream()
+ executors.stream()
.map(executor -> executor.getRunningTasks().size())
.mapToInt(i -> i).min();
if (!minOccupancy.isPresent()) {
- return Collections.emptySet();
+ throw new RuntimeException("Cannot find min occupancy");
}
- final Set<ExecutorRepresenter> candidateExecutors =
- executorRepresenterSet.stream()
+ return executors.stream()
.filter(executor -> executor.getRunningTasks().size() == minOccupancy.getAsInt())
- .collect(Collectors.toSet());
-
- return candidateExecutors;
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No such executor"));
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 62af040..42e9a2c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.reef.annotations.audience.DriverSide;
import java.util.*;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,12 +54,14 @@ public final class SchedulerRunner {
private boolean isTerminated;
private final DelayedSignalingCondition schedulingIteration = new DelayedSignalingCondition();
- private ExecutorRegistry executorRegistry;
- private SchedulingPolicy schedulingPolicy;
+ private final ExecutorRegistry executorRegistry;
+ private final SchedulingConstraintRegistry schedulingConstraintRegistry;
+ private final SchedulingPolicy schedulingPolicy;
@VisibleForTesting
@Inject
- public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
+ public SchedulerRunner(final SchedulingConstraintRegistry schedulingConstraintRegistry,
+ final SchedulingPolicy schedulingPolicy,
final PendingTaskCollectionPointer pendingTaskCollectionPointer,
final ExecutorRegistry executorRegistry) {
this.jobStateManagers = new HashMap<>();
@@ -67,6 +71,7 @@ public final class SchedulerRunner {
this.isTerminated = false;
this.executorRegistry = executorRegistry;
this.schedulingPolicy = schedulingPolicy;
+ this.schedulingConstraintRegistry = schedulingConstraintRegistry;
}
/**
@@ -111,16 +116,23 @@ public final class SchedulerRunner {
LOG.debug("Trying to schedule {}...", task.getTaskId());
executorRegistry.viewExecutors(executors -> {
- final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executors, task);
- final Optional<ExecutorRepresenter> firstCandidate = candidateExecutors.stream().findFirst();
-
- if (firstCandidate.isPresent()) {
+ final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
+ task.getExecutionProperties().forEachProperties(property -> {
+ final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass());
+ if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) {
+ candidateExecutors.setValue(candidateExecutors.getValue().stream()
+ .filter(e -> constraint.get().testSchedulability(e, task))
+ .collect(Collectors.toSet()));
+ }
+ });
+ if (!candidateExecutors.getValue().isEmpty()) {
+ // Select executor
+ final ExecutorRepresenter selectedExecutor
+ = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task);
// update metadata first
jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
// send the task
- final ExecutorRepresenter selectedExecutor = firstCandidate.get();
selectedExecutor.onTaskScheduled(task);
} else {
couldNotSchedule.add(task);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
similarity index 68%
copy from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
copy to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
index 0064310..7e71344 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
@@ -18,19 +18,15 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.DefaultImplementation;
import javax.annotation.concurrent.ThreadSafe;
-import java.util.Set;
/**
- * (WARNING) Implementations of this interface must be thread-safe.
+ * Functions to test schedulability with a pair of an executor and a task.
*/
@DriverSide
@ThreadSafe
@FunctionalInterface
-@DefaultImplementation(CompositeSchedulingPolicy.class)
-public interface SchedulingPolicy {
- Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task);
+public interface SchedulingConstraint {
+ boolean testSchedulability(final ExecutorRepresenter executor, final Task task);
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
new file mode 100644
index 0000000..97ab554
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Registry for {@link SchedulingConstraint}.
+ */
+@DriverSide
+@ThreadSafe
+public final class SchedulingConstraintRegistry {
+ private final Map<Type, SchedulingConstraint> typeToSchedulingConstraintMap = new ConcurrentHashMap<>();
+
+ @Inject
+ private SchedulingConstraintRegistry(
+ final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
+ final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
+ final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint) {
+ registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
+ registerSchedulingConstraint(freeSlotSchedulingConstraint);
+ registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+ }
+
+ /**
+ * Registers a {@link SchedulingConstraint}.
+ * @param policy the policy to register
+ */
+ public void registerSchedulingConstraint(final SchedulingConstraint policy) {
+ final AssociatedProperty associatedProperty = policy.getClass().getAnnotation(AssociatedProperty.class);
+ if (associatedProperty == null || associatedProperty.value() == null) {
+ throw new RuntimeException(String.format("SchedulingConstraint %s has no associated VertexExecutionProperty",
+ policy.getClass()));
+ }
+ final Class<? extends ExecutionProperty> property = associatedProperty.value();
+ if (typeToSchedulingConstraintMap.putIfAbsent(property, policy) != null) {
+ throw new RuntimeException(String.format("Multiple SchedulingConstraint for VertexExecutionProperty %s:"
+ + "%s, %s", property, typeToSchedulingConstraintMap.get(property), policy));
+ }
+ }
+
+ /**
+ * Returns {@link SchedulingConstraint} for the given {@link VertexExecutionProperty}.
+ * @param propertyClass {@link VertexExecutionProperty} class
+ * @return the corresponding {@link SchedulingConstraint} object,
+ * or {@link Optional#EMPTY} if no such policy was found
+ */
+ public Optional<SchedulingConstraint> get(final Class<? extends VertexExecutionProperty> propertyClass) {
+ return Optional.ofNullable(typeToSchedulingConstraintMap.get(propertyClass));
+ }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 0064310..95288e7 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -17,20 +17,27 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.DefaultImplementation;
-import javax.annotation.concurrent.ThreadSafe;
-import java.util.Set;
+import java.util.Collection;
/**
- * (WARNING) Implementations of this interface must be thread-safe.
+ * A function to select an executor from collection of available executors.
*/
@DriverSide
@ThreadSafe
@FunctionalInterface
-@DefaultImplementation(CompositeSchedulingPolicy.class)
+@DefaultImplementation(MinOccupancyFirstSchedulingPolicy.class)
public interface SchedulingPolicy {
- Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task);
+ /**
+ * A function to select an executor from the specified collection of available executors.
+ *
+ * @param executors The collection of available executors.
+ * Implementations can assume that the collection is not empty.
+ * @param task The task to schedule
+ * @return The selected executor. It must be a member of {@code executors}.
+ */
+ ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task);
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
similarity index 67%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f84f8a8..f18c900 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -17,6 +17,8 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
@@ -26,21 +28,21 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.*;
-import java.util.stream.Collectors;
/**
* This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however for Tasks
* with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors
- * where the corresponding data resides.
+ * where the corresponding data reside.
*/
@ThreadSafe
@DriverSide
-public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy {
- private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+@AssociatedProperty(SourceLocationAwareSchedulingProperty.class)
+public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingConstraint.class);
@VisibleForTesting
@Inject
- public SourceLocationAwareSchedulingPolicy() {
+ public SourceLocationAwareSchedulingConstraint() {
}
/**
@@ -56,33 +58,21 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
return new HashSet<>(sourceLocations);
}
- /**
- * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
- * If there is no source locations, will return original set.
- * @param task {@link Task} to be scheduled.
- * @return filtered Set of {@link ExecutorRepresenter}.
- */
@Override
- public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final Task task) {
+ public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
final Set<String> sourceLocations;
try {
sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
} catch (final UnsupportedOperationException e) {
- return executorRepresenterSet;
+ return true;
} catch (final Exception e) {
throw new RuntimeException(e);
}
if (sourceLocations.size() == 0) {
- return executorRepresenterSet;
+ return true;
}
- final Set<ExecutorRepresenter> candidateExecutors =
- executorRepresenterSet.stream()
- .filter(executor -> sourceLocations.contains(executor.getNodeName()))
- .collect(Collectors.toSet());
-
- return candidateExecutors;
+ return sourceLocations.contains(executor.getNodeName());
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index ba8a2f5..8bfe43d 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -63,6 +63,7 @@ import static org.mockito.Mockito.mock;
public final class BatchSingleJobSchedulerTest {
private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
private Scheduler scheduler;
+ private SchedulingConstraintRegistry schedulingConstraint;
private SchedulingPolicy schedulingPolicy;
private SchedulerRunner schedulerRunner;
private ExecutorRegistry executorRegistry;
@@ -86,8 +87,9 @@ public final class BatchSingleJobSchedulerTest {
executorRegistry = injector.getInstance(ExecutorRegistry.class);
metricMessageHandler = mock(MetricMessageHandler.class);
pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
- schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
- schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
+ schedulingConstraint = injector.getInstance(SchedulingConstraintRegistry.class);
+ schedulingPolicy = injector.getInstance(SchedulingPolicy.class);
+ schedulerRunner = new SchedulerRunner(schedulingConstraint, schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
scheduler =
@@ -154,7 +156,7 @@ public final class BatchSingleJobSchedulerTest {
// b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
final int scheduleGroupIdx = i;
- final List<Stage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
+ final List<Stage> stages = filterStagesWithAScheduleGroup(plan.getStageDAG(), scheduleGroupIdx);
LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
stages.forEach(stage -> {
@@ -175,10 +177,10 @@ public final class BatchSingleJobSchedulerTest {
assertTrue(jobStateManager.isJobDone());
}
- private List<Stage> filterStagesWithAScheduleGroupIndex(
- final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) {
+ private List<Stage> filterStagesWithAScheduleGroup(
+ final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroup) {
final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices(
- stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex));
+ stage -> stage.getScheduleGroup() == scheduleGroup));
// Return the filtered vertices as a sorted list
final List<Stage> sortedStages = new ArrayList<>(stageSet.size());
@@ -192,7 +194,7 @@ public final class BatchSingleJobSchedulerTest {
private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) {
final Set<Integer> scheduleGroupSet = new HashSet<>();
- physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex()));
+ physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroup()));
return scheduleGroupSet.size();
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
similarity index 78%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
index 7a2d149..4aa2ecc 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
@@ -24,16 +24,17 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.*;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
/**
- * Tests {@link ContainerTypeAwareSchedulingPolicy}.
+ * Tests {@link ContainerTypeAwareSchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ExecutorRepresenter.class, Task.class})
-public final class ContainerTypeAwareSchedulingPolicyTest {
+public final class ContainerTypeAwareSchedulingConstraintTest {
private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
@@ -43,7 +44,7 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
@Test
public void testContainerTypeAware() {
- final SchedulingPolicy schedulingPolicy = new ContainerTypeAwareSchedulingPolicy();
+ final SchedulingConstraint schedulingConstraint = new ContainerTypeAwareSchedulingConstraint();
final ExecutorRepresenter a0 = mockExecutorRepresenter(ExecutorPlacementProperty.TRANSIENT);
final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
@@ -54,10 +55,11 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
- final Set<ExecutorRepresenter> candidateExecutors1 =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, task1);
+ final Set<ExecutorRepresenter> candidateExecutors1 = executorRepresenterList1.stream()
+ .filter(e -> schedulingConstraint.testSchedulability(e, task1))
+ .collect(Collectors.toSet());;
- final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
+ final Set<ExecutorRepresenter> expectedExecutors1 = Collections.singleton(a1);
assertEquals(expectedExecutors1, candidateExecutors1);
final Task task2 = mock(Task.class);
@@ -66,8 +68,9 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
- final Set<ExecutorRepresenter> candidateExecutors2 =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, task2);
+ final Set<ExecutorRepresenter> candidateExecutors2 = executorRepresenterList2.stream()
+ .filter(e -> schedulingConstraint.testSchedulability(e, task2))
+ .collect(Collectors.toSet());
final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
deleted file mode 100644
index 62b5181..0000000
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.comm.ControlMessage;
-import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Function;
-
-import static edu.snu.nemo.runtime.common.state.StageState.State.COMPLETE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests fault tolerance.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class,
- PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
-public final class FaultToleranceTest {
- private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceTest.class.getName());
-
- private SchedulingPolicy schedulingPolicy;
- private SchedulerRunner schedulerRunner;
- private ExecutorRegistry executorRegistry;
-
- private MetricMessageHandler metricMessageHandler;
- private PendingTaskCollectionPointer pendingTaskCollectionPointer;
- private PubSubEventHandlerWrapper pubSubEventHandler;
- private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
- private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
- private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
- private final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
-
- private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
-
- @Before
- public void setUp() throws Exception {
- metricMessageHandler = mock(MetricMessageHandler.class);
- pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
- updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
-
- }
-
- private Scheduler setUpScheduler(final boolean useMockSchedulerRunner) throws InjectionException {
- final Injector injector = Tang.Factory.getTang().newInjector();
- executorRegistry = injector.getInstance(ExecutorRegistry.class);
-
- pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
- schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-
- if (useMockSchedulerRunner) {
- schedulerRunner = mock(SchedulerRunner.class);
- } else {
- schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
- }
- return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollectionPointer, blockManagerMaster,
- pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
- }
-
- /**
- * Tests fault tolerance after a container removal.
- */
- @Test(timeout=50000)
- public void testContainerRemoval() throws Exception {
- final ActiveContext activeContext = mock(ActiveContext.class);
- Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
- final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
- final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
- new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
- final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
- final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
- final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
- final List<ExecutorRepresenter> executors = new ArrayList<>();
- executors.add(a1);
- executors.add(a2);
- executors.add(a3);
-
- final Scheduler scheduler = setUpScheduler(true);
- for (final ExecutorRepresenter executor : executors) {
- scheduler.onExecutorAdded(executor);
- }
-
- final PhysicalPlan plan =
- TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-
- final JobStateManager jobStateManager =
- new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
- scheduler.scheduleJob(plan, jobStateManager);
-
- final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
- for (final Stage stage : dagOf4Stages) {
- if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
-
- // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
- stage.getTaskIds().forEach(taskId ->
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
- taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 2) {
- scheduler.onExecutorRemoved("a3");
- // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
-
- // Due to round robin scheduling, "a2" is assured to have a running Task.
- scheduler.onExecutorRemoved("a2");
-
- // Re-schedule
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
-
- final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
- .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
- assertTrue(maxTaskAttempt.isPresent());
- assertEquals(2, (int) maxTaskAttempt.get());
-
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
- stage.getTaskIds().forEach(taskId ->
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
- taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 3) {
- // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3.
- // Schedule only the first Task
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, true);
- } else {
- throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d",
- stage.getScheduleGroupIndex()));
- }
- }
- }
-
- /**
- * Tests fault tolerance after an output write failure.
- */
- @Test(timeout=50000)
- public void testOutputFailure() throws Exception {
- final ActiveContext activeContext = mock(ActiveContext.class);
- Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
- final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
- final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
- new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
- final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
- final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
- final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
- final List<ExecutorRepresenter> executors = new ArrayList<>();
- executors.add(a1);
- executors.add(a2);
- executors.add(a3);
- final Scheduler scheduler = setUpScheduler(true);
- for (final ExecutorRepresenter executor : executors) {
- scheduler.onExecutorAdded(executor);
- }
-
- final PhysicalPlan plan =
- TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
- final JobStateManager jobStateManager =
- new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
- scheduler.scheduleJob(plan, jobStateManager);
-
- final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
- for (final Stage stage : dagOf4Stages) {
- if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
-
- // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
- stage.getTaskIds().forEach(taskId ->
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
- taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 2) {
- // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
- stage.getTaskIds().forEach(taskId ->
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
- taskId, TaskState.State.FAILED_RECOVERABLE, 1,
- TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-
- // Re-schedule
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
-
- final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
- .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
- assertTrue(maxTaskAttempt.isPresent());
- assertEquals(2, (int) maxTaskAttempt.get());
-
- stage.getTaskIds().forEach(taskId ->
- assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId)));
- }
- }
- }
-
- /**
- * Tests fault tolerance after an input read failure.
- */
- @Test(timeout=50000)
- public void testInputReadFailure() throws Exception {
- final ActiveContext activeContext = mock(ActiveContext.class);
- Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
- final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
- final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
- new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
- final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
- final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
- final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
- final List<ExecutorRepresenter> executors = new ArrayList<>();
- executors.add(a1);
- executors.add(a2);
- executors.add(a3);
- final Scheduler scheduler = setUpScheduler(true);
- for (final ExecutorRepresenter executor : executors) {
- scheduler.onExecutorAdded(executor);
- }
-
- final PhysicalPlan plan =
- TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
- final JobStateManager jobStateManager =
- new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
- scheduler.scheduleJob(plan, jobStateManager);
-
- final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
- for (final Stage stage : dagOf4Stages) {
- if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
-
- // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
- stage.getTaskIds().forEach(taskId ->
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
- taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 2) {
- // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
-
- stage.getTaskIds().forEach(taskId ->
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
- taskId, TaskState.State.FAILED_RECOVERABLE, 1,
- TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
-
- // Re-schedule
- SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
- executorRegistry, false);
-
- final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
- .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
- assertTrue(maxTaskAttempt.isPresent());
- assertEquals(2, (int) maxTaskAttempt.get());
-
- stage.getTaskIds().forEach(taskId ->
- assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId)));
- }
- }
- }
-
- /**
- * Tests the rescheduling of Tasks upon a failure.
- */
- @Test(timeout=200000)
- public void testTaskReexecutionForFailure() throws Exception {
- final ActiveContext activeContext = mock(ActiveContext.class);
- Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
- final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
- final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
- new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-
- final Scheduler scheduler = setUpScheduler(false);
- final PhysicalPlan plan =
- TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
- final JobStateManager jobStateManager =
- new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
- scheduler.scheduleJob(plan, jobStateManager);
-
- final List<ExecutorRepresenter> executors = new ArrayList<>();
- final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
- int executorIdIndex = 1;
- float removalChance = 0.5f; // Out of 1.0
- final Random random = new Random(0); // Deterministic seed.
-
- for (final Stage stage : dagOf4Stages) {
-
- while (jobStateManager.getStageState(stage.getId()) != COMPLETE) {
- // By chance, remove or add executor
- if (isTrueByChance(random, removalChance)) {
- // REMOVE EXECUTOR
- if (!executors.isEmpty()) {
- scheduler.onExecutorRemoved(executors.remove(random.nextInt(executors.size())).getExecutorId());
- } else {
- // Skip, since no executor is running.
- }
- } else {
- if (executors.size() < 3) {
- // ADD EXECUTOR
- final ExecutorRepresenter newExecutor = executorRepresenterGenerator.apply("a" + executorIdIndex);
- executorIdIndex += 1;
- executors.add(newExecutor);
- scheduler.onExecutorAdded(newExecutor);
- } else {
- // Skip, in order to keep the total number of running executors below or equal to 3
- }
- }
-
- // Complete the execution of tasks
- if (!executors.isEmpty()) {
- final int indexOfCompletedExecutor = random.nextInt(executors.size());
- // New set for snapshotting
- final Map<String, Integer> runningTaskSnapshot =
- new HashMap<>(executors.get(indexOfCompletedExecutor).getRunningTaskToAttempt());
- runningTaskSnapshot.entrySet().forEach(entry -> {
- SchedulerTestUtil.sendTaskStateEventToScheduler(
- scheduler, executorRegistry, entry.getKey(), TaskState.State.COMPLETE, entry.getValue());
- });
- }
- }
- }
- assertTrue(jobStateManager.isJobDone());
- }
-
- private boolean isTrueByChance(final Random random, final float chance) {
- return chance > random.nextDouble();
- }
-}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
similarity index 75%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index c60eeb3..f2dd878 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
@@ -23,17 +24,18 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.*;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
/**
- * Tests {@link FreeSlotSchedulingPolicy}.
+ * Tests {@link FreeSlotSchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ExecutorRepresenter.class, Task.class})
-public final class FreeSlotSchedulingPolicyTest {
+public final class FreeSlotSchedulingConstraintTest {
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
final int capacity) {
@@ -47,18 +49,20 @@ public final class FreeSlotSchedulingPolicyTest {
@Test
public void testFreeSlot() {
- final SchedulingPolicy schedulingPolicy = new FreeSlotSchedulingPolicy();
+ final SchedulingConstraint schedulingConstraint = new FreeSlotSchedulingConstraint();
final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
final Task task = mock(Task.class);
+ when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true));
final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
- final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
+ final Set<ExecutorRepresenter> candidateExecutors = executorRepresenterList.stream()
+ .filter(e -> schedulingConstraint.testSchedulability(e, task))
+ .collect(Collectors.toSet());
- final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
+ final Set<ExecutorRepresenter> expectedExecutors = Collections.singleton(a1);
assertEquals(expectedExecutors, candidateExecutors);
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
index 60311a6..bf6ebc8 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
@@ -44,7 +44,7 @@ public final class MinOccupancyFirstSchedulingPolicyTest {
}
@Test
- public void testRoundRobin() {
+ public void test() {
final SchedulingPolicy schedulingPolicy = new MinOccupancyFirstSchedulingPolicy();
final ExecutorRepresenter a0 = mockExecutorRepresenter(1);
final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
@@ -52,13 +52,9 @@ public final class MinOccupancyFirstSchedulingPolicyTest {
final Task task = mock(Task.class);
- final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
+ final List<ExecutorRepresenter> executorRepresenterList = Arrays.asList(a0, a1, a2);
- final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
-
- final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
- assertEquals(expectedExecutors, candidateExecutors);
+ assertEquals(a0, schedulingPolicy.selectExecutor(executorRepresenterList, task));
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 41f9642..387c6b9 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -16,14 +16,11 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import java.util.Collection;
-import java.util.List;
import java.util.Optional;
/**
@@ -94,23 +91,4 @@ final class SchedulerTestUtil {
scheduler.onTaskStateReportFromExecutor(scheduledExecutor.getExecutorId(), taskId, attemptIdx,
newState, null, cause);
}
-
- static void sendTaskStateEventToScheduler(final Scheduler scheduler,
- final ExecutorRegistry executorRegistry,
- final String taskId,
- final TaskState.State newState,
- final int attemptIdx) {
- sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, newState, attemptIdx, null);
- }
-
- static void mockSchedulingBySchedulerRunner(final PendingTaskCollectionPointer pendingTaskCollectionPointer,
- final SchedulingPolicy schedulingPolicy,
- final JobStateManager jobStateManager,
- final ExecutorRegistry executorRegistry,
- final boolean scheduleOnlyTheFirstStage) {
- final SchedulerRunner schedulerRunner =
- new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
- schedulerRunner.scheduleJob(jobStateManager);
- schedulerRunner.doScheduleTaskList();
- }
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
similarity index 83%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 2f538ee..772c587 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -25,16 +26,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
/**
- * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
+ * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
-public final class SourceLocationAwareSchedulingPolicyTest {
+public final class SourceLocationAwareSchedulingConstraintTest {
private static final String SITE_0 = "SEOUL";
private static final String SITE_1 = "JINJU";
private static final String SITE_2 = "BUSAN";
@@ -46,12 +49,12 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
/**
- * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link Task} when
+ * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
* there are no executors in appropriate location(s).
*/
@Test
public void testSourceLocationAwareSchedulingNotAvailable() {
- final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
+ final SchedulingConstraint schedulingConstraint = new SourceLocationAwareSchedulingConstraint();
// Prepare test scenario
final Task task = CreateTask.withReadablesWithSourceLocations(
@@ -59,16 +62,17 @@ public final class SourceLocationAwareSchedulingPolicyTest {
final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
- assertEquals(Collections.emptySet(),
- schedulingPolicy.filterExecutorRepresenters(new HashSet<>(Arrays.asList(e0, e1)), task));
+ assertEquals(Collections.emptySet(), Arrays.asList(e0, e1).stream()
+ .filter(e -> schedulingConstraint.testSchedulability(e, task))
+ .collect(Collectors.toSet()));
}
/**
- * {@link SourceLocationAwareSchedulingPolicy} should properly schedule TGs with multiple source locations.
+ * {@link SourceLocationAwareSchedulingConstraint} should properly schedule TGs with multiple source locations.
*/
@Test
public void testSourceLocationAwareSchedulingWithMultiSource() {
- final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
+ final SchedulingConstraint schedulingConstraint = new SourceLocationAwareSchedulingConstraint();
// Prepare test scenario
final Task task0 = CreateTask.withReadablesWithSourceLocations(
Collections.singletonList(Collections.singletonList(SITE_1)));
@@ -83,8 +87,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
- assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
- new HashSet<>(Collections.singletonList(e)), task));
+ assertTrue(schedulingConstraint.testSchedulability(e, task));
}
}
@@ -103,6 +106,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
readable));
when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement()));
when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap);
+ when(mockInstance.getPropertyValue(SourceLocationAwareSchedulingProperty.class)).thenReturn(Optional.of(true));
return mockInstance;
}
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
index f8c4f90..3c74fec 100644
--- a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.common.test.EmptyComponents;
import org.apache.reef.tang.Injector;
@@ -63,12 +63,12 @@ public final class PhysicalPlanGeneratorTest {
final Stage s0 = stages.next();
final Stage s1 = stages.next();
- assertNotEquals(s0.getScheduleGroupIndex(), s1.getScheduleGroupIndex());
+ assertNotEquals(s0.getScheduleGroup(), s1.getScheduleGroup());
}
- private static final IRVertex newIRVertex(final int scheduleGroupIndex, final int parallelism) {
+ private static final IRVertex newIRVertex(final int scheduleGroup, final int parallelism) {
final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM);
- irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+ irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
return irVertex;
}
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
index a60cf88..515bd95 100644
--- a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.common.test.EmptyComponents;
import org.apache.reef.tang.Tang;
@@ -56,21 +56,21 @@ public final class StagePartitionerTest {
/**
* @param parallelism {@link ParallelismProperty} value for the new vertex
- * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for the new vertex
+ * @param scheduleGroup {@link ScheduleGroupProperty} value for the new vertex
* @param otherProperties other {@link VertexExecutionProperty} for the new vertex
* @return new {@link IRVertex}
*/
- private static IRVertex newVertex(final int parallelism, final int scheduleGroupIndex,
+ private static IRVertex newVertex(final int parallelism, final int scheduleGroup,
final List<VertexExecutionProperty> otherProperties) {
final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
- vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+ vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
otherProperties.forEach(property -> vertex.getExecutionProperties().put(property));
return vertex;
}
/**
- * A simple case where two vertices have common parallelism and ScheduleGroupIndex so that get merged into one stage.
+ * A simple case where two vertices have common parallelism and ScheduleGroup so that get merged into one stage.
*/
@Test
public void testLinear() {
@@ -101,10 +101,10 @@ public final class StagePartitionerTest {
}
/**
- * A simple case where two vertices have different ScheduleGroupIndex.
+ * A simple case where two vertices have different ScheduleGroup.
*/
@Test
- public void testSplitByScheduleGroupIndex() {
+ public void testSplitByScheduleGroup() {
final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
final IRVertex v1 = newVertex(1, 1, Collections.emptyList());
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
new file mode 100644
index 0000000..443ffca
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link SchedulingConstraintRegistry}.
+ */
+public final class SchedulingConstraintnRegistryTest {
+ @Test
+ public void testSchedulingConstraintRegistry() throws InjectionException {
+ final SchedulingConstraintRegistry registry = Tang.Factory.getTang().newInjector()
+ .getInstance(SchedulingConstraintRegistry.class);
+ assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ExecutorSlotComplianceProperty.class, registry));
+ assertEquals(ContainerTypeAwareSchedulingConstraint.class,
+ getConstraintOf(ExecutorPlacementProperty.class, registry));
+ assertEquals(SourceLocationAwareSchedulingConstraint.class,
+ getConstraintOf(SourceLocationAwareSchedulingProperty.class, registry));
+ }
+
+ private static Class<? extends SchedulingConstraint> getConstraintOf(
+ final Class<? extends VertexExecutionProperty> property, final SchedulingConstraintRegistry registry) {
+ return registry.get(property)
+ .orElseThrow(() -> new RuntimeException(String.format(
+ "No SchedulingConstraint found for property %s", property)))
+ .getClass();
+ }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index 5cf95b5..33e4988 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternPro
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.common.test.EmptyComponents;
import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
@@ -54,7 +54,7 @@ public final class DefaultScheduleGroupPassTest {
@Test
public void testAnnotatingPass() {
final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass();
- assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
+ assertEquals(ScheduleGroupProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
}
/**
@@ -67,11 +67,11 @@ public final class DefaultScheduleGroupPassTest {
new TestPolicy(), "");
for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
- final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
- final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
- .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
+ final Integer currentScheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class).get();
+ final Integer largestScheduleGroupOfParent = processedDAG.getParents(irVertex.getId()).stream()
+ .mapToInt(v -> v.getPropertyValue(ScheduleGroupProperty.class).get())
.max().orElse(0);
- assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
+ assertTrue(currentScheduleGroup >= largestScheduleGroupOfParent);
}
}
@@ -155,31 +155,31 @@ public final class DefaultScheduleGroupPassTest {
}
/**
- * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code expected}.
+ * Asserts that the {@link ScheduleGroupProperty} is equal to {@code expected}.
* @param expected the expected property value
* @param vertex the vertex to test
*/
- private static void assertScheduleGroupIndex(final int expected, final IRVertex vertex) {
- assertEquals(expected, getScheduleGroupIndex(vertex));
+ private static void assertScheduleGroup(final int expected, final IRVertex vertex) {
+ assertEquals(expected, getScheduleGroup(vertex));
}
/**
* @param vertex a vertex
- * @return {@link ScheduleGroupIndexProperty} of {@code vertex}
+ * @return {@link ScheduleGroupProperty} of {@code vertex}
*/
- private static int getScheduleGroupIndex(final IRVertex vertex) {
- return vertex.getPropertyValue(ScheduleGroupIndexProperty.class)
+ private static int getScheduleGroup(final IRVertex vertex) {
+ return vertex.getPropertyValue(ScheduleGroupProperty.class)
.orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup not set for %s", vertex.getId())));
}
/**
- * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupIndexProperty} value.
+ * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupProperty} value.
* @param vertices vertices to test
*/
- private static void assertDifferentScheduleGroupIndex(final Collection<IRVertex> vertices) {
+ private static void assertDifferentScheduleGroup(final Collection<IRVertex> vertices) {
final Set<Integer> indices = new HashSet<>();
vertices.forEach(v -> {
- final int idx = getScheduleGroupIndex(v);
+ final int idx = getScheduleGroup(v);
assertFalse(indices.contains(idx));
indices.add(idx);
});
@@ -194,7 +194,7 @@ public final class DefaultScheduleGroupPassTest {
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
pass.apply(dag.left());
- dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+ dag.right().forEach(v -> assertScheduleGroup(0, v));
}
/**
@@ -206,12 +206,12 @@ public final class DefaultScheduleGroupPassTest {
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
pass.apply(dag.left());
- dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v));
- dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v));
+ dag.right().subList(0, 4).forEach(v -> assertScheduleGroup(0, v));
+ dag.right().subList(4, 5).forEach(v -> assertScheduleGroup(1, v));
}
/**
- * Test scenario to determine whether push edges properly enforces same scheduleGroupIndex or not.
+ * Test scenario to determine whether push edges properly enforces same scheduleGroup or not.
*/
@Test
public void testBranchWithPush() {
@@ -219,7 +219,7 @@ public final class DefaultScheduleGroupPassTest {
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Push);
pass.apply(dag.left());
- dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+ dag.right().forEach(v -> assertScheduleGroup(0, v));
}
/**
@@ -230,7 +230,7 @@ public final class DefaultScheduleGroupPassTest {
final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, true, true);
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, DataFlowModelProperty.Value.Pull);
- assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+ assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices());
}
/**
@@ -241,7 +241,7 @@ public final class DefaultScheduleGroupPassTest {
final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, false, true);
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Pull);
- assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+ assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices());
}
/**
@@ -253,11 +253,11 @@ public final class DefaultScheduleGroupPassTest {
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
pass.apply(dag.left());
- final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
- final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
- dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
- dag.right().subList(2, 4).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
- dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v));
+ final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
+ final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
+ dag.right().subList(0, 2).forEach(v -> assertScheduleGroup(idxForFirstScheduleGroup, v));
+ dag.right().subList(2, 4).forEach(v -> assertScheduleGroup(idxForSecondScheduleGroup, v));
+ dag.right().subList(4, 6).forEach(v -> assertScheduleGroup(2, v));
}
/**
@@ -269,7 +269,7 @@ public final class DefaultScheduleGroupPassTest {
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
pass.apply(dag.left());
- dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+ dag.right().forEach(v -> assertScheduleGroup(0, v));
}
/**
@@ -283,9 +283,9 @@ public final class DefaultScheduleGroupPassTest {
dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
.getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
pass.apply(dag.left());
- final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
- final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
- dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
- dag.right().subList(2, 6).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+ final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
+ final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
+ dag.right().subList(0, 2).forEach(v -> assertScheduleGroup(idxForFirstScheduleGroup, v));
+ dag.right().subList(2, 6).forEach(v -> assertScheduleGroup(idxForSecondScheduleGroup, v));
}
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index b1ea8f8..41edef3 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -28,21 +28,21 @@ public final class PolicyBuilderTest {
@Test
public void testDisaggregationPolicy() {
final Policy disaggregationPolicy = new DisaggregationPolicy();
- assertEquals(14, disaggregationPolicy.getCompileTimePasses().size());
+ assertEquals(16, disaggregationPolicy.getCompileTimePasses().size());
assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
}
@Test
public void testPadoPolicy() {
final Policy padoPolicy = new PadoPolicy();
- assertEquals(16, padoPolicy.getCompileTimePasses().size());
+ assertEquals(18, padoPolicy.getCompileTimePasses().size());
assertEquals(0, padoPolicy.getRuntimePasses().size());
}
@Test
public void testDataSkewPolicy() {
final Policy dataSkewPolicy = new DataSkewPolicy();
- assertEquals(18, dataSkewPolicy.getCompileTimePasses().size());
+ assertEquals(20, dataSkewPolicy.getCompileTimePasses().size());
assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
}