You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2018/06/29 04:25:42 UTC

[incubator-nemo] branch master updated: [NEMO-73, 75] SchedulingPolicy as Vertex-level Execution Property (#57)

This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 209369c  [NEMO-73,75] SchedulingPolicy as Vertex-level Execution Property (#57)
209369c is described below

commit 209369ce9bf4980ef4abccaa845fde0b33304af0
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Fri Jun 29 13:25:40 2018 +0900

    [NEMO-73,75] SchedulingPolicy as Vertex-level Execution Property (#57)
    
    JIRA: [NEMO-73: SchedulingPolicy as Vertex-level Execution Property](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-73)
    JIRA: [NEMO-75: Rename SchedulingPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-75)
    
    **Major changes:**
    - Separated SchedulingConstraint from SchedulingPolicy.
    - Added SchedulingConstraintRegistry.
    - Modified SchedulerRunner to find SchedulingConstraints to apply on the task to schedule.
    - Removed CompositeSchedulingPolicy
    
    **Minor changes to note:**
    - Added ExecutorSlotCompliance{Property,Pass} and SourceLocationAwareScheduling{Property,Pass}
    - Renamed ScheduleGroupIndex to ScheduleGroup
    - MinOccupancySchedulingPolicy is now default SchedulingPolicy.
    
    **Tests for the changes:**
    - Added SchedulingConstraintRegistryTest
    
    **Other comments:**
    - FaultToleranceTest is expected to revamped in changes for NEMO-50.
    
    resolves [NEMO-73](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-73)
    resolves [NEMO-75](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-75)
---
 .../ir/executionproperty/AssociatedProperty.java   |  28 ++
 .../ExecutorSlotComplianceProperty.java            |  46 +++
 ...dexProperty.java => ScheduleGroupProperty.java} |  10 +-
 .../SourceLocationAwareSchedulingProperty.java     |  47 +++
 .../annotating/DefaultScheduleGroupPass.java       |  40 +--
 .../annotating/ExecutorSlotCompliancePass.java     |  40 +++
 .../SourceLocationAwareSchedulingPass.java         |  40 +++
 .../composite/PrimitiveCompositePass.java          |   4 +-
 .../runtime/common/plan/PhysicalPlanGenerator.java |  50 +--
 .../edu/snu/nemo/runtime/common/plan/Stage.java    |  12 +-
 .../executor/datatransfer/DataTransferTest.java    |   9 +-
 .../master/scheduler/BatchSingleJobScheduler.java  |  30 +-
 .../scheduler/CompositeSchedulingPolicy.java       |  56 ---
 ...=> ContainerTypeAwareSchedulingConstraint.java} |  30 +-
 ...licy.java => FreeSlotSchedulingConstraint.java} |  27 +-
 .../MinOccupancyFirstSchedulingPolicy.java         |  25 +-
 .../runtime/master/scheduler/SchedulerRunner.java  |  30 +-
 ...dulingPolicy.java => SchedulingConstraint.java} |  10 +-
 .../scheduler/SchedulingConstraintRegistry.java    |  74 ++++
 .../runtime/master/scheduler/SchedulingPolicy.java |  19 +-
 ...> SourceLocationAwareSchedulingConstraint.java} |  32 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |  16 +-
 ...ontainerTypeAwareSchedulingConstraintTest.java} |  19 +-
 .../master/scheduler/FaultToleranceTest.java       | 379 ---------------------
 ....java => FreeSlotSchedulingConstraintTest.java} |  16 +-
 .../MinOccupancyFirstSchedulingPolicyTest.java     |  10 +-
 .../master/scheduler/SchedulerTestUtil.java        |  22 --
 ...urceLocationAwareSchedulingConstraintTest.java} |  24 +-
 .../common/plan/PhysicalPlanGeneratorTest.java     |   8 +-
 .../runtime/common/plan/StagePartitionerTest.java  |  14 +-
 .../SchedulingConstraintnRegistryTest.java         |  50 +++
 .../annotating/DefaultScheduleGroupPassTest.java   |  64 ++--
 .../optimizer/policy/PolicyBuilderTest.java        |   6 +-
 33 files changed, 568 insertions(+), 719 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
new file mode 100644
index 0000000..ceebd86
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.executionproperty;
+
+import java.lang.annotation.*;
+
+/**
+ * Declares associated {@link ExecutionProperty} for implementations.
+ */
+@Target({ElementType.TYPE})
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+public @interface AssociatedProperty {
+  Class<? extends ExecutionProperty> value();
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
new file mode 100644
index 0000000..357be21
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to comply to slot restrictions when scheduling this vertex.
+ */
+public final class ExecutorSlotComplianceProperty extends VertexExecutionProperty<Boolean> {
+  private static final ExecutorSlotComplianceProperty COMPLIANCE_TRUE = new ExecutorSlotComplianceProperty(true);
+  private static final ExecutorSlotComplianceProperty COMPLIANCE_FALSE
+      = new ExecutorSlotComplianceProperty(false);
+
+  /**
+   * Default constructor.
+   *
+   * @param value value of the ExecutionProperty
+   */
+  private ExecutorSlotComplianceProperty(final boolean value) {
+    super(value);
+  }
+
+  /**
+   * Static method getting execution property.
+   *
+   * @param value value of the new execution property
+   * @return the execution property
+   */
+  public static ExecutorSlotComplianceProperty of(final boolean value) {
+    return value ? COMPLIANCE_TRUE : COMPLIANCE_FALSE;
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
similarity index 76%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
index 08518ff..3b521cb 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
@@ -18,14 +18,14 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
- * ScheduleGroupIndex ExecutionProperty.
+ * ScheduleGroup ExecutionProperty.
  */
-public final class ScheduleGroupIndexProperty extends VertexExecutionProperty<Integer> {
+public final class ScheduleGroupProperty extends VertexExecutionProperty<Integer> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private ScheduleGroupIndexProperty(final Integer value) {
+  private ScheduleGroupProperty(final Integer value) {
     super(value);
   }
 
@@ -34,7 +34,7 @@ public final class ScheduleGroupIndexProperty extends VertexExecutionProperty<In
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static ScheduleGroupIndexProperty of(final Integer value) {
-    return new ScheduleGroupIndexProperty(value);
+  public static ScheduleGroupProperty of(final Integer value) {
+    return new ScheduleGroupProperty(value);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
new file mode 100644
index 0000000..cad987d
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to schedule this vertex only on executors where source data reside.
+ */
+public final class SourceLocationAwareSchedulingProperty extends VertexExecutionProperty<Boolean> {
+  private static final SourceLocationAwareSchedulingProperty SOURCE_TRUE
+      = new SourceLocationAwareSchedulingProperty(true);
+  private static final SourceLocationAwareSchedulingProperty SOURCE_FALSE
+      = new SourceLocationAwareSchedulingProperty(false);
+
+  /**
+   * Default constructor.
+   *
+   * @param value value of the ExecutionProperty
+   */
+  private SourceLocationAwareSchedulingProperty(final boolean value) {
+    super(value);
+  }
+
+  /**
+   * Static method getting execution property.
+   *
+   * @param value value of the new execution property
+   * @return the execution property
+   */
+  public static SourceLocationAwareSchedulingProperty of(final boolean value) {
+    return value ? SOURCE_TRUE : SOURCE_FALSE;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 38f964c..ca788bd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import org.apache.commons.lang3.mutable.MutableInt;
 
 import java.util.*;
@@ -71,7 +71,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
   public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup,
                                   final boolean allowShuffleWithinScheduleGroup,
                                   final boolean allowMultipleInEdgesWithinScheduleGroup) {
-    super(ScheduleGroupIndexProperty.class, Stream.of(
+    super(ScheduleGroupProperty.class, Stream.of(
         DataCommunicationPatternProperty.class,
         DataFlowModelProperty.class
     ).collect(Collectors.toSet()));
@@ -93,7 +93,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
         newScheduleGroup.vertices.add(irVertex);
         irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup);
       }
-      // Get scheduleGroupIndex
+      // Get scheduleGroup
       final ScheduleGroup scheduleGroup = irVertexToScheduleGroupMap.get(irVertex);
       if (scheduleGroup == null) {
         throw new RuntimeException(String.format("ScheduleGroup must be set for %s", irVertex));
@@ -203,42 +203,42 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
       }
     });
 
-    // Assign ScheduleGroupIndex property based on topology of ScheduleGroups
-    final MutableInt currentScheduleGroupIndex = new MutableInt(getNextScheudleGroupIndex(dag.getVertices()));
+    // Assign ScheduleGroup property based on topology of ScheduleGroups
+    final MutableInt currentScheduleGroup = new MutableInt(getNextScheudleGroup(dag.getVertices()));
     final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder = new DAGBuilder<>();
     scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex);
     scheduleGroups.forEach(src -> src.scheduleGroupsTo
         .forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new ScheduleGroupEdge(src, dst))));
     scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> {
-      boolean usedCurrentIndex = false;
+      boolean usedCurrentScheduleGroup = false;
       for (final IRVertex irVertex : scheduleGroup.vertices) {
-        if (!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) {
-          irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue()));
-          usedCurrentIndex = true;
+        if (!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
+          irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
+          usedCurrentScheduleGroup = true;
         }
       }
-      if (usedCurrentIndex) {
-        currentScheduleGroupIndex.increment();
+      if (usedCurrentScheduleGroup) {
+        currentScheduleGroup.increment();
       }
     });
     return dag;
   }
 
   /**
-   * Determines the range of {@link ScheduleGroupIndexProperty} value that will prevent collision
-   * with the existing {@link ScheduleGroupIndexProperty}.
+   * Determines the range of {@link ScheduleGroupProperty} value that will prevent collision
+   * with the existing {@link ScheduleGroupProperty}.
    * @param irVertexCollection collection of {@link IRVertex}
-   * @return the minimum value for the {@link ScheduleGroupIndexProperty} that won't collide with the existing values
+   * @return the minimum value for the {@link ScheduleGroupProperty} that won't collide with the existing values
    */
-  private int getNextScheudleGroupIndex(final Collection<IRVertex> irVertexCollection) {
-    int nextScheduleGroupIndex = 0;
+  private int getNextScheudleGroup(final Collection<IRVertex> irVertexCollection) {
+    int nextScheduleGroup = 0;
     for (final IRVertex irVertex : irVertexCollection) {
-      final Optional<Integer> scheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class);
-      if (scheduleGroupIndex.isPresent()) {
-        nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, nextScheduleGroupIndex);
+      final Optional<Integer> scheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class);
+      if (scheduleGroup.isPresent()) {
+        nextScheduleGroup = Math.max(scheduleGroup.get() + 1, nextScheduleGroup);
       }
     }
-    return nextScheduleGroupIndex;
+    return nextScheduleGroup;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
new file mode 100644
index 0000000..e1b2123
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+
+/**
+ * Sets {@link ExecutorSlotComplianceProperty}.
+ */
+public final class ExecutorSlotCompliancePass extends AnnotatingPass {
+
+  public ExecutorSlotCompliancePass() {
+    super(ExecutorSlotComplianceProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    // On every vertex, if ExecutorSlotComplianceProperty is not set, put it as true.
+    dag.getVertices().stream()
+        .filter(v -> !v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class))
+        .forEach(v -> v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true)));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
new file mode 100644
index 0000000..8c30d95
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
+
+/**
+ * Sets {@link SourceLocationAwareSchedulingProperty}.
+ */
+public final class SourceLocationAwareSchedulingPass extends AnnotatingPass {
+
+  public SourceLocationAwareSchedulingPass() {
+    super(SourceLocationAwareSchedulingProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    // On every vertex, if SourceLocationAwareSchedulingProperty is not set, put it as true.
+    dag.getVertices().stream()
+        .filter(v -> !v.getExecutionProperties().containsKey(SourceLocationAwareSchedulingProperty.class))
+        .forEach(v -> v.getExecutionProperties().put(SourceLocationAwareSchedulingProperty.of(true)));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index 3a5c80c..a30abec 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -37,7 +37,9 @@ public final class PrimitiveCompositePass extends CompositePass {
         new DefaultEdgeUsedDataHandlingPass(),
         new DefaultScheduleGroupPass(),
         new CompressionPass(),
-        new DecompressionPass()
+        new DecompressionPass(),
+        new SourceLocationAwareSchedulingPass(),
+        new ExecutorSlotCompliancePass()
     ));
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 107a623..b9b9795 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
@@ -226,8 +226,8 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
   private void integrityCheck(final Stage stage) {
     stage.getPropertyValue(ParallelismProperty.class)
         .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
-    stage.getPropertyValue(ScheduleGroupIndexProperty.class)
-        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
+    stage.getPropertyValue(ScheduleGroupProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage"));
 
     stage.getIRDAG().getVertices().forEach(irVertex -> {
       // Check vertex type.
@@ -241,24 +241,24 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
 
   /**
    * Split ScheduleGroups by Pull {@link StageEdge}s, and ensure topological ordering of
-   * {@link ScheduleGroupIndexProperty}.
+   * {@link ScheduleGroupProperty}.
    *
    * @param dag {@link DAG} of {@link Stage}s to manipulate
    */
   private void splitScheduleGroupByPullStageEdges(final DAG<Stage, StageEdge> dag) {
-    final MutableInt nextScheduleGroupIndex = new MutableInt(0);
-    final Map<Stage, Integer> stageToScheduleGroupIndexMap = new HashMap<>();
+    final MutableInt nextScheduleGroup = new MutableInt(0);
+    final Map<Stage, Integer> stageToScheduleGroupMap = new HashMap<>();
 
     dag.topologicalDo(currentStage -> {
-      // Base case: assign New ScheduleGroupIndex of the Stage
-      stageToScheduleGroupIndexMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroupIndex));
+      // Base case: assign New ScheduleGroup of the Stage
+      stageToScheduleGroupMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroup));
 
       for (final StageEdge stageEdgeFromCurrentStage : dag.getOutgoingEdgesOf(currentStage)) {
         final Stage destination = stageEdgeFromCurrentStage.getDst();
-        // Skip if some Stages that destination depends on do not have assigned new ScheduleGroupIndex
+        // Skip if some Stages that destination depends on do not have assigned new ScheduleGroup
         boolean skip = false;
         for (final StageEdge stageEdgeToDestination : dag.getIncomingEdgesOf(destination)) {
-          if (!stageToScheduleGroupIndexMap.containsKey(stageEdgeToDestination.getSrc())) {
+          if (!stageToScheduleGroupMap.containsKey(stageEdgeToDestination.getSrc())) {
             skip = true;
             break;
           }
@@ -266,42 +266,42 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
         if (skip) {
           continue;
         }
-        if (stageToScheduleGroupIndexMap.containsKey(destination)) {
+        if (stageToScheduleGroupMap.containsKey(destination)) {
           continue;
         }
 
         // Find any non-pull inEdge
-        Integer scheduleGroupIndex = null;
-        Integer newScheduleGroupIndex = null;
+        Integer scheduleGroup = null;
+        Integer newScheduleGroup = null;
         for (final StageEdge stageEdge : dag.getIncomingEdgesOf(destination)) {
           final Stage source = stageEdge.getSrc();
           if (stageEdge.getDataFlowModel() != DataFlowModelProperty.Value.Pull) {
-            if (scheduleGroupIndex != null && source.getScheduleGroupIndex() != scheduleGroupIndex) {
+            if (scheduleGroup != null && source.getScheduleGroup() != scheduleGroup) {
               throw new RuntimeException(String.format("Multiple Push inEdges from different ScheduleGroup: %d, %d",
-                  scheduleGroupIndex, source.getScheduleGroupIndex()));
+                  scheduleGroup, source.getScheduleGroup()));
             }
-            if (source.getScheduleGroupIndex() != destination.getScheduleGroupIndex()) {
+            if (source.getScheduleGroup() != destination.getScheduleGroup()) {
               throw new RuntimeException(String.format("Split ScheduleGroup by push StageEdge: %d, %d",
-                  source.getScheduleGroupIndex(), destination.getScheduleGroupIndex()));
+                  source.getScheduleGroup(), destination.getScheduleGroup()));
             }
-            scheduleGroupIndex = source.getScheduleGroupIndex();
-            newScheduleGroupIndex = stageToScheduleGroupIndexMap.get(source);
+            scheduleGroup = source.getScheduleGroup();
+            newScheduleGroup = stageToScheduleGroupMap.get(source);
           }
         }
 
-        if (newScheduleGroupIndex == null) {
-          stageToScheduleGroupIndexMap.put(destination, getAndIncrement(nextScheduleGroupIndex));
+        if (newScheduleGroup == null) {
+          stageToScheduleGroupMap.put(destination, getAndIncrement(nextScheduleGroup));
         } else {
-          stageToScheduleGroupIndexMap.put(destination, newScheduleGroupIndex);
+          stageToScheduleGroupMap.put(destination, newScheduleGroup);
         }
       }
     });
 
     dag.topologicalDo(stage -> {
-      final int scheduleGroupIndex = stageToScheduleGroupIndexMap.get(stage);
-      stage.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+      final int scheduleGroup = stageToScheduleGroupMap.get(stage);
+      stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
       stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties()
-          .put(ScheduleGroupIndexProperty.of(scheduleGroupIndex)));
+          .put(ScheduleGroupProperty.of(scheduleGroup)));
     });
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index 9e5da38..c2abcc4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -22,7 +22,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.commons.lang3.SerializationUtils;
 
@@ -94,11 +94,11 @@ public final class Stage extends Vertex {
   }
 
   /**
-   * @return the schedule group index.
+   * @return the schedule group.
    */
-  public int getScheduleGroupIndex() {
-    return executionProperties.get(ScheduleGroupIndexProperty.class)
-        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
+  public int getScheduleGroup() {
+    return executionProperties.get(ScheduleGroupProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage"));
   }
 
   /**
@@ -130,7 +130,7 @@ public final class Stage extends Vertex {
   @Override
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("{\"scheduleGroupIndex\": ").append(getScheduleGroupIndex());
+    sb.append("{\"scheduleGroup\": ").append(getScheduleGroup());
     sb.append(", \"irDag\": ").append(irDag);
     sb.append(", \"parallelism\": ").append(getParallelism());
     sb.append(", \"executionProperties\": ").append(executionProperties);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 846bcca..c2919a3 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.Pair;
@@ -131,9 +131,10 @@ public final class DataTransferTest {
     final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
     final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
-    final SchedulingPolicy schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
+    final SchedulingConstraintRegistry schedulingConstraint = injector.getInstance(SchedulingConstraintRegistry.class);
+    final SchedulingPolicy schedulingPolicy = injector.getInstance(SchedulingPolicy.class);
     final PendingTaskCollectionPointer taskQueue = new PendingTaskCollectionPointer();
-    final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
+    final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingConstraint, schedulingPolicy, taskQueue, executorRegistry);
     final Scheduler scheduler = new BatchSingleJobScheduler(
         schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
@@ -549,7 +550,7 @@ public final class DataTransferTest {
 
     final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty = new ExecutionPropertyMap<>(stageId);
     stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN));
-    stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0));
+    stageExecutionProperty.put(ScheduleGroupProperty.of(0));
     return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 4bd8d4f..4009424 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -114,7 +114,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     LOG.info("Job to schedule: {}", physicalPlanOfJob.getId());
 
     this.initialScheduleGroup = physicalPlanOfJob.getStageDAG().getVertices().stream()
-        .mapToInt(stage -> stage.getScheduleGroupIndex())
+        .mapToInt(stage -> stage.getScheduleGroup())
         .min().getAsInt();
 
     scheduleNextScheduleGroup(initialScheduleGroup);
@@ -213,7 +213,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       // Schedule a stage after marking the necessary tasks to failed_recoverable.
       // The stage for one of the tasks that failed is a starting point to look
       // for the next stage to be scheduled.
-      scheduleNextScheduleGroup(getSchedulingIndexOfStage(
+      scheduleNextScheduleGroup(getScheduleGroupOfStage(
           RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next())));
     }
   }
@@ -245,7 +245,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   /**
    * Selects the next stage to schedule.
-   * It takes the referenceScheduleGroupIndex as a reference point to begin looking for the stages to execute:
+   * It takes the referenceScheduleGroup as a reference point to begin looking for the stages to execute:
    *
    * a) returns the failed_recoverable stage(s) of the earliest schedule group, if it(they) exists.
    * b) returns an empty optional if there are no schedulable stages at the moment.
@@ -253,15 +253,15 @@ public final class BatchSingleJobScheduler implements Scheduler {
    *    - if an ancestor schedule group is still executing
    * c) returns the next set of schedulable stages (if the current schedule group has completed execution)
    *
-   * @param referenceScheduleGroupIndex
+   * @param referenceScheduleGroup
    *      the index of the schedule group that is executing/has executed when this method is called.
    * @return an optional of the (possibly empty) next schedulable stage
    */
-  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroupIndex) {
+  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroup) {
     // Recursively check the previous schedule group.
-    if (referenceScheduleGroupIndex > initialScheduleGroup) {
+    if (referenceScheduleGroup > initialScheduleGroup) {
       final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
-          selectNextScheduleGroupToSchedule(referenceScheduleGroupIndex - 1);
+          selectNextScheduleGroupToSchedule(referenceScheduleGroup - 1);
       if (ancestorStagesFromAScheduleGroup.isPresent()) {
         // Nothing to schedule from the previous schedule group.
         return ancestorStagesFromAScheduleGroup;
@@ -277,7 +277,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     // All previous schedule groups are complete, we need to check for the current schedule group.
     final List<Stage> currentScheduleGroup = reverseTopoStages
         .stream()
-        .filter(stage -> stage.getScheduleGroupIndex() == referenceScheduleGroupIndex)
+        .filter(stage -> stage.getScheduleGroup() == referenceScheduleGroup)
         .collect(Collectors.toList());
     final boolean allStagesOfThisGroupComplete = currentScheduleGroup
         .stream()
@@ -286,7 +286,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
         .allMatch(state -> state.equals(StageState.State.COMPLETE));
 
     if (!allStagesOfThisGroupComplete) {
-      LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroupIndex);
+      LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroup);
       final List<Stage> stagesToSchedule = currentScheduleGroup
           .stream()
           .filter(stage -> {
@@ -304,7 +304,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       final List<Stage> stagesToSchedule = reverseTopoStages
           .stream()
           .filter(stage -> {
-            if (stage.getScheduleGroupIndex() == referenceScheduleGroupIndex + 1) {
+            if (stage.getScheduleGroup() == referenceScheduleGroup + 1) {
               final String stageId = stage.getId();
               return jobStateManager.getStageState(stageId) != StageState.State.EXECUTING
                   && jobStateManager.getStageState(stageId) != StageState.State.COMPLETE;
@@ -314,7 +314,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
           .collect(Collectors.toList());
 
       if (stagesToSchedule.isEmpty()) {
-        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroupIndex + 1);
+        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroup + 1);
         return Optional.empty();
       }
 
@@ -433,7 +433,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
       // if the stage this task belongs to is complete,
       if (!jobStateManager.isJobDone()) {
-        scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageIdForTaskUponCompletion));
+        scheduleNextScheduleGroup(getScheduleGroupOfStage(stageIdForTaskUponCompletion));
       }
     }
     schedulerRunner.onAnExecutorAvailable();
@@ -502,7 +502,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
         // TODO #50: Carefully retry tasks in the scheduler
       case OUTPUT_WRITE_FAILURE:
         blockManagerMaster.onProducerTaskFailed(taskId);
-        scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageId));
+        scheduleNextScheduleGroup(getScheduleGroupOfStage(stageId));
         break;
       case CONTAINER_FAILURE:
         LOG.info("Only the failed task will be retried.");
@@ -513,7 +513,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     schedulerRunner.onAnExecutorAvailable();
   }
 
-  private int getSchedulingIndexOfStage(final String stageId) {
-    return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroupIndex();
+  private int getScheduleGroupOfStage(final String stageId) {
+    return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroup();
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
deleted file mode 100644
index 213f5f1..0000000
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.plan.Task;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-
-import javax.inject.Inject;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Temporary class to implement stacked scheduling policy.
- * At now, policies are injected through Tang, but have to be configurable by users
- * when Nemo supports job-wide execution property.
- * TODO #69: Support job-wide execution property.
- */
-public final class CompositeSchedulingPolicy implements SchedulingPolicy {
-  private final List<SchedulingPolicy> schedulingPolicies;
-
-  @Inject
-  private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy sourceLocationAwareSchedulingPolicy,
-                                    final MinOccupancyFirstSchedulingPolicy minOccupancyFirstSchedulingPolicy,
-                                    final FreeSlotSchedulingPolicy freeSlotSchedulingPolicy,
-                                    final ContainerTypeAwareSchedulingPolicy containerTypeAwareSchedulingPolicy) {
-    schedulingPolicies = Arrays.asList(
-        freeSlotSchedulingPolicy,
-        containerTypeAwareSchedulingPolicy,
-        sourceLocationAwareSchedulingPolicy,
-        minOccupancyFirstSchedulingPolicy);
-  }
-
-  @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
-    Set<ExecutorRepresenter> candidates = executorRepresenterSet;
-    for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, task);
-    }
-    return candidates;
-  }
-}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
similarity index 52%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
index 5f64e9c..a91e996 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
@@ -16,45 +16,29 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This policy find executors which has corresponding container type.
  */
-public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolicy {
+@AssociatedProperty(ExecutorPlacementProperty.class)
+public final class ContainerTypeAwareSchedulingConstraint implements SchedulingConstraint {
 
   @VisibleForTesting
   @Inject
-  public ContainerTypeAwareSchedulingPolicy() {
+  public ContainerTypeAwareSchedulingConstraint() {
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
-   *                               If the container type of target Task is NONE, it will return the original set.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
-
+  public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
     final String executorPlacementPropertyValue = task.getPropertyValue(ExecutorPlacementProperty.class)
         .orElse(ExecutorPlacementProperty.NONE);
-    if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) {
-      return executorRepresenterSet;
-    }
-
-    final Set<ExecutorRepresenter> candidateExecutors =
-        executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(executorPlacementPropertyValue))
-            .collect(Collectors.toSet());
-
-    return candidateExecutors;
+    return executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE) ? true
+        : executor.getContainerType().equals(executorPlacementPropertyValue);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
similarity index 50%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index d92d9d4..1fc1f6e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -16,36 +16,29 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This policy finds executor that has free slot for a Task.
  */
-public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
+@AssociatedProperty(ExecutorSlotComplianceProperty.class)
+public final class FreeSlotSchedulingConstraint implements SchedulingConstraint {
   @VisibleForTesting
   @Inject
-  public FreeSlotSchedulingPolicy() {
+  public FreeSlotSchedulingConstraint() {
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
-   *                               Executors that do not have any free slots will be filtered by this policy.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
-    final Set<ExecutorRepresenter> candidateExecutors =
-        executorRepresenterSet.stream()
-            .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
-            .collect(Collectors.toSet());
+  public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
+    if (!task.getPropertyValue(ExecutorSlotComplianceProperty.class).orElse(false)) {
+      return true;
+    }
 
-    return candidateExecutors;
+    return executor.getRunningTasks().size() < executor.getExecutorCapacity();
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index 120e1fb..e53f659 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -26,12 +26,7 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.stream.Collectors;
-
 /**
- * {@inheritDoc}
- * A scheduling policy used by {@link BatchSingleJobScheduler}.
- *
  * This policy chooses a set of Executors, on which have minimum running Tasks.
  */
 @ThreadSafe
@@ -44,28 +39,20 @@ public final class MinOccupancyFirstSchedulingPolicy implements SchedulingPolicy
   public MinOccupancyFirstSchedulingPolicy() {
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the occupancy of the Executors.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
+  public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) {
     final OptionalInt minOccupancy =
-        executorRepresenterSet.stream()
+        executors.stream()
         .map(executor -> executor.getRunningTasks().size())
         .mapToInt(i -> i).min();
 
     if (!minOccupancy.isPresent()) {
-      return Collections.emptySet();
+      throw new RuntimeException("Cannot find min occupancy");
     }
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-        executorRepresenterSet.stream()
+    return executors.stream()
         .filter(executor -> executor.getRunningTasks().size() == minOccupancy.getAsInt())
-        .collect(Collectors.toSet());
-
-    return candidateExecutors;
+        .findFirst()
+        .orElseThrow(() -> new RuntimeException("No such executor"));
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 62af040..42e9a2c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.reef.annotations.audience.DriverSide;
 
 import java.util.*;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,12 +54,14 @@ public final class SchedulerRunner {
   private boolean isTerminated;
 
   private final DelayedSignalingCondition schedulingIteration = new DelayedSignalingCondition();
-  private ExecutorRegistry executorRegistry;
-  private SchedulingPolicy schedulingPolicy;
+  private final ExecutorRegistry executorRegistry;
+  private final SchedulingConstraintRegistry schedulingConstraintRegistry;
+  private final SchedulingPolicy schedulingPolicy;
 
   @VisibleForTesting
   @Inject
-  public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
+  public SchedulerRunner(final SchedulingConstraintRegistry schedulingConstraintRegistry,
+                         final SchedulingPolicy schedulingPolicy,
                          final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
@@ -67,6 +71,7 @@ public final class SchedulerRunner {
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
     this.schedulingPolicy = schedulingPolicy;
+    this.schedulingConstraintRegistry = schedulingConstraintRegistry;
   }
 
   /**
@@ -111,16 +116,23 @@ public final class SchedulerRunner {
 
       LOG.debug("Trying to schedule {}...", task.getTaskId());
       executorRegistry.viewExecutors(executors -> {
-        final Set<ExecutorRepresenter> candidateExecutors =
-            schedulingPolicy.filterExecutorRepresenters(executors, task);
-        final Optional<ExecutorRepresenter> firstCandidate = candidateExecutors.stream().findFirst();
-
-        if (firstCandidate.isPresent()) {
+        final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
+        task.getExecutionProperties().forEachProperties(property -> {
+          final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass());
+          if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) {
+            candidateExecutors.setValue(candidateExecutors.getValue().stream()
+                .filter(e -> constraint.get().testSchedulability(e, task))
+                .collect(Collectors.toSet()));
+          }
+        });
+        if (!candidateExecutors.getValue().isEmpty()) {
+          // Select executor
+          final ExecutorRepresenter selectedExecutor
+              = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task);
           // update metadata first
           jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
 
           // send the task
-          final ExecutorRepresenter selectedExecutor = firstCandidate.get();
           selectedExecutor.onTaskScheduled(task);
         } else {
           couldNotSchedule.add(task);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
similarity index 68%
copy from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
copy to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
index 0064310..7e71344 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
@@ -18,19 +18,15 @@ package edu.snu.nemo.runtime.master.scheduler;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.DefaultImplementation;
 
 import javax.annotation.concurrent.ThreadSafe;
-import java.util.Set;
 
 /**
- * (WARNING) Implementations of this interface must be thread-safe.
+ * Functions to test schedulability with a pair of an executor and a task.
  */
 @DriverSide
 @ThreadSafe
 @FunctionalInterface
-@DefaultImplementation(CompositeSchedulingPolicy.class)
-public interface SchedulingPolicy {
-  Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final Task task);
+public interface SchedulingConstraint {
+  boolean testSchedulability(final ExecutorRepresenter executor, final Task task);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
new file mode 100644
index 0000000..97ab554
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Registry for {@link SchedulingConstraint}.
+ */
+@DriverSide
+@ThreadSafe
+public final class SchedulingConstraintRegistry {
+  private final Map<Type, SchedulingConstraint> typeToSchedulingConstraintMap = new ConcurrentHashMap<>();
+
+  @Inject
+  private SchedulingConstraintRegistry(
+      final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
+      final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
+      final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint) {
+    registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
+    registerSchedulingConstraint(freeSlotSchedulingConstraint);
+    registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+  }
+
+  /**
+   * Registers a {@link SchedulingConstraint}.
+   * @param policy the policy to register
+   */
+  public void registerSchedulingConstraint(final SchedulingConstraint policy) {
+    final AssociatedProperty associatedProperty = policy.getClass().getAnnotation(AssociatedProperty.class);
+    if (associatedProperty == null || associatedProperty.value() == null) {
+      throw new RuntimeException(String.format("SchedulingConstraint %s has no associated VertexExecutionProperty",
+          policy.getClass()));
+    }
+    final Class<? extends ExecutionProperty> property = associatedProperty.value();
+    if (typeToSchedulingConstraintMap.putIfAbsent(property, policy) != null) {
+      throw new RuntimeException(String.format("Multiple SchedulingConstraint for VertexExecutionProperty %s:"
+          + "%s, %s", property, typeToSchedulingConstraintMap.get(property), policy));
+    }
+  }
+
+  /**
+   * Returns {@link SchedulingConstraint} for the given {@link VertexExecutionProperty}.
+   * @param propertyClass {@link VertexExecutionProperty} class
+   * @return the corresponding {@link SchedulingConstraint} object,
+   *         or {@link Optional#EMPTY} if no such policy was found
+   */
+  public Optional<SchedulingConstraint> get(final Class<? extends VertexExecutionProperty> propertyClass) {
+    return Optional.ofNullable(typeToSchedulingConstraintMap.get(propertyClass));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 0064310..95288e7 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -17,20 +17,27 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
-import javax.annotation.concurrent.ThreadSafe;
-import java.util.Set;
+import java.util.Collection;
 
 /**
- * (WARNING) Implementations of this interface must be thread-safe.
+ * A function to select an executor from collection of available executors.
  */
 @DriverSide
 @ThreadSafe
 @FunctionalInterface
-@DefaultImplementation(CompositeSchedulingPolicy.class)
+@DefaultImplementation(MinOccupancyFirstSchedulingPolicy.class)
 public interface SchedulingPolicy {
-  Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final Task task);
+  /**
+   * A function to select an executor from the specified collection of available executors.
+   *
+   * @param executors The collection of available executors.
+   *                  Implementations can assume that the collection is not empty.
+   * @param task The task to schedule
+   * @return The selected executor. It must be a member of {@code executors}.
+   */
+  ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
similarity index 67%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f84f8a8..f18c900 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -17,6 +17,8 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -26,21 +28,21 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.stream.Collectors;
 
 /**
  * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however for Tasks
  * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors
- * where the corresponding data resides.
+ * where the corresponding data reside.
  */
 @ThreadSafe
 @DriverSide
-public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy {
-  private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+@AssociatedProperty(SourceLocationAwareSchedulingProperty.class)
+public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingConstraint.class);
 
   @VisibleForTesting
   @Inject
-  public SourceLocationAwareSchedulingPolicy() {
+  public SourceLocationAwareSchedulingConstraint() {
   }
 
   /**
@@ -56,33 +58,21 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
     return new HashSet<>(sourceLocations);
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
-   *                               If there is no source locations, will return original set.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
+  public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
     final Set<String> sourceLocations;
     try {
       sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
-      return executorRepresenterSet;
+      return true;
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
 
     if (sourceLocations.size() == 0) {
-      return executorRepresenterSet;
+      return true;
     }
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-            executorRepresenterSet.stream()
-            .filter(executor -> sourceLocations.contains(executor.getNodeName()))
-            .collect(Collectors.toSet());
-
-    return candidateExecutors;
+    return sourceLocations.contains(executor.getNodeName());
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index ba8a2f5..8bfe43d 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -63,6 +63,7 @@ import static org.mockito.Mockito.mock;
 public final class BatchSingleJobSchedulerTest {
   private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
   private Scheduler scheduler;
+  private SchedulingConstraintRegistry schedulingConstraint;
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
@@ -86,8 +87,9 @@ public final class BatchSingleJobSchedulerTest {
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
-    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
+    schedulingConstraint = injector.getInstance(SchedulingConstraintRegistry.class);
+    schedulingPolicy = injector.getInstance(SchedulingPolicy.class);
+    schedulerRunner = new SchedulerRunner(schedulingConstraint, schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
@@ -154,7 +156,7 @@ public final class BatchSingleJobSchedulerTest {
     // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
     for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
-      final List<Stage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
+      final List<Stage> stages = filterStagesWithAScheduleGroup(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
       stages.forEach(stage -> {
@@ -175,10 +177,10 @@ public final class BatchSingleJobSchedulerTest {
     assertTrue(jobStateManager.isJobDone());
   }
 
-  private List<Stage> filterStagesWithAScheduleGroupIndex(
-      final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) {
+  private List<Stage> filterStagesWithAScheduleGroup(
+      final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroup) {
     final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices(
-        stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex));
+        stage -> stage.getScheduleGroup() == scheduleGroup));
 
     // Return the filtered vertices as a sorted list
     final List<Stage> sortedStages = new ArrayList<>(stageSet.size());
@@ -192,7 +194,7 @@ public final class BatchSingleJobSchedulerTest {
 
   private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) {
     final Set<Integer> scheduleGroupSet = new HashSet<>();
-    physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex()));
+    physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroup()));
     return scheduleGroupSet.size();
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
similarity index 78%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
index 7a2d149..4aa2ecc 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
@@ -24,16 +24,17 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 /**
- * Tests {@link ContainerTypeAwareSchedulingPolicy}.
+ * Tests {@link ContainerTypeAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class})
-public final class ContainerTypeAwareSchedulingPolicyTest {
+public final class ContainerTypeAwareSchedulingConstraintTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
     final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
@@ -43,7 +44,7 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
 
   @Test
   public void testContainerTypeAware() {
-    final SchedulingPolicy schedulingPolicy = new ContainerTypeAwareSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new ContainerTypeAwareSchedulingConstraint();
     final ExecutorRepresenter a0 = mockExecutorRepresenter(ExecutorPlacementProperty.TRANSIENT);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
@@ -54,10 +55,11 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
-    final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, task1);
+    final Set<ExecutorRepresenter> candidateExecutors1 = executorRepresenterList1.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task1))
+        .collect(Collectors.toSet());;
 
-    final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
+    final Set<ExecutorRepresenter> expectedExecutors1 = Collections.singleton(a1);
     assertEquals(expectedExecutors1, candidateExecutors1);
 
     final Task task2 = mock(Task.class);
@@ -66,8 +68,9 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
-    final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, task2);
+    final Set<ExecutorRepresenter> candidateExecutors2 = executorRepresenterList2.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task2))
+        .collect(Collectors.toSet());
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
deleted file mode 100644
index 62b5181..0000000
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.comm.ControlMessage;
-import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Function;
-
-import static edu.snu.nemo.runtime.common.state.StageState.State.COMPLETE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests fault tolerance.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class,
-    PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
-public final class FaultToleranceTest {
-  private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceTest.class.getName());
-
-  private SchedulingPolicy schedulingPolicy;
-  private SchedulerRunner schedulerRunner;
-  private ExecutorRegistry executorRegistry;
-
-  private MetricMessageHandler metricMessageHandler;
-  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
-  private PubSubEventHandlerWrapper pubSubEventHandler;
-  private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
-  private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
-  private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
-  private final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
-
-  private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
-
-  @Before
-  public void setUp() throws Exception {
-    metricMessageHandler = mock(MetricMessageHandler.class);
-    pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
-    updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
-
-  }
-
-  private Scheduler setUpScheduler(final boolean useMockSchedulerRunner) throws InjectionException {
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    executorRegistry = injector.getInstance(ExecutorRegistry.class);
-
-    pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
-    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-
-    if (useMockSchedulerRunner) {
-      schedulerRunner = mock(SchedulerRunner.class);
-    } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
-    }
-    return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollectionPointer, blockManagerMaster,
-        pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
-  }
-
-  /**
-   * Tests fault tolerance after a container removal.
-   */
-  @Test(timeout=50000)
-  public void testContainerRemoval() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-
-    final Scheduler scheduler = setUpScheduler(true);
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
-
-    final PhysicalPlan plan =
-        TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
-
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 2) {
-        scheduler.onExecutorRemoved("a3");
-        // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        // Due to round robin scheduling, "a2" is assured to have a running Task.
-        scheduler.onExecutorRemoved("a2");
-
-        // Re-schedule
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
-            .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
-        assertTrue(maxTaskAttempt.isPresent());
-        assertEquals(2, (int) maxTaskAttempt.get());
-
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 3) {
-        // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3.
-        // Schedule only the first Task
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, true);
-      } else {
-        throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d",
-            stage.getScheduleGroupIndex()));
-      }
-    }
-  }
-
-  /**
-   * Tests fault tolerance after an output write failure.
-   */
-  @Test(timeout=50000)
-  public void testOutputFailure() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-    final Scheduler scheduler = setUpScheduler(true);
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
-
-    final PhysicalPlan plan =
-        TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
-
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 2) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
-                taskId, TaskState.State.FAILED_RECOVERABLE, 1,
-                TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-
-        // Re-schedule
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
-            .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
-        assertTrue(maxTaskAttempt.isPresent());
-        assertEquals(2, (int) maxTaskAttempt.get());
-
-        stage.getTaskIds().forEach(taskId ->
-            assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId)));
-      }
-    }
-  }
-
-  /**
-   * Tests fault tolerance after an input read failure.
-   */
-  @Test(timeout=50000)
-  public void testInputReadFailure() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-    final Scheduler scheduler = setUpScheduler(true);
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
-
-    final PhysicalPlan plan =
-        TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
-
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 2) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
-                taskId, TaskState.State.FAILED_RECOVERABLE, 1,
-                TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
-
-        // Re-schedule
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
-            .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
-        assertTrue(maxTaskAttempt.isPresent());
-        assertEquals(2, (int) maxTaskAttempt.get());
-
-        stage.getTaskIds().forEach(taskId ->
-            assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId)));
-      }
-    }
-  }
-
-  /**
-   * Tests the rescheduling of Tasks upon a failure.
-   */
-  @Test(timeout=200000)
-  public void testTaskReexecutionForFailure() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-
-    final Scheduler scheduler = setUpScheduler(false);
-    final PhysicalPlan plan =
-        TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    int executorIdIndex = 1;
-    float removalChance = 0.5f; // Out of 1.0
-    final Random random = new Random(0); // Deterministic seed.
-
-    for (final Stage stage : dagOf4Stages) {
-
-      while (jobStateManager.getStageState(stage.getId()) != COMPLETE) {
-        // By chance, remove or add executor
-        if (isTrueByChance(random, removalChance)) {
-          // REMOVE EXECUTOR
-          if (!executors.isEmpty()) {
-            scheduler.onExecutorRemoved(executors.remove(random.nextInt(executors.size())).getExecutorId());
-          } else {
-            // Skip, since no executor is running.
-          }
-        } else {
-          if (executors.size() < 3) {
-            // ADD EXECUTOR
-            final ExecutorRepresenter newExecutor = executorRepresenterGenerator.apply("a" + executorIdIndex);
-            executorIdIndex += 1;
-            executors.add(newExecutor);
-            scheduler.onExecutorAdded(newExecutor);
-          } else {
-            // Skip, in order to keep the total number of running executors below or equal to 3
-          }
-        }
-
-        // Complete the execution of tasks
-        if (!executors.isEmpty()) {
-          final int indexOfCompletedExecutor = random.nextInt(executors.size());
-          // New set for snapshotting
-          final Map<String, Integer> runningTaskSnapshot =
-              new HashMap<>(executors.get(indexOfCompletedExecutor).getRunningTaskToAttempt());
-          runningTaskSnapshot.entrySet().forEach(entry -> {
-            SchedulerTestUtil.sendTaskStateEventToScheduler(
-                scheduler, executorRegistry, entry.getKey(), TaskState.State.COMPLETE, entry.getValue());
-          });
-        }
-      }
-    }
-    assertTrue(jobStateManager.isJobDone());
-  }
-
-  private boolean isTrueByChance(final Random random, final float chance) {
-    return chance > random.nextDouble();
-  }
-}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
similarity index 75%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index c60eeb3..f2dd878 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -23,17 +24,18 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 /**
- * Tests {@link FreeSlotSchedulingPolicy}.
+ * Tests {@link FreeSlotSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class})
-public final class FreeSlotSchedulingPolicyTest {
+public final class FreeSlotSchedulingConstraintTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
                                                              final int capacity) {
@@ -47,18 +49,20 @@ public final class FreeSlotSchedulingPolicyTest {
 
   @Test
   public void testFreeSlot() {
-    final SchedulingPolicy schedulingPolicy = new FreeSlotSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new FreeSlotSchedulingConstraint();
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
     final Task task = mock(Task.class);
+    when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true));
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
+    final Set<ExecutorRepresenter> candidateExecutors = executorRepresenterList.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task))
+        .collect(Collectors.toSet());
 
-    final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
+    final Set<ExecutorRepresenter> expectedExecutors = Collections.singleton(a1);
     assertEquals(expectedExecutors, candidateExecutors);
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
index 60311a6..bf6ebc8 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
@@ -44,7 +44,7 @@ public final class MinOccupancyFirstSchedulingPolicyTest {
   }
 
   @Test
-  public void testRoundRobin() {
+  public void test() {
     final SchedulingPolicy schedulingPolicy = new MinOccupancyFirstSchedulingPolicy();
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
@@ -52,13 +52,9 @@ public final class MinOccupancyFirstSchedulingPolicyTest {
 
     final Task task = mock(Task.class);
 
-    final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
+    final List<ExecutorRepresenter> executorRepresenterList = Arrays.asList(a0, a1, a2);
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
-
-    final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
-    assertEquals(expectedExecutors, candidateExecutors);
+    assertEquals(a0, schedulingPolicy.selectExecutor(executorRepresenterList, task));
   }
 }
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 41f9642..387c6b9 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -16,14 +16,11 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -94,23 +91,4 @@ final class SchedulerTestUtil {
     scheduler.onTaskStateReportFromExecutor(scheduledExecutor.getExecutorId(), taskId, attemptIdx,
         newState, null, cause);
   }
-
-  static void sendTaskStateEventToScheduler(final Scheduler scheduler,
-                                            final ExecutorRegistry executorRegistry,
-                                            final String taskId,
-                                            final TaskState.State newState,
-                                            final int attemptIdx) {
-    sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, newState, attemptIdx, null);
-  }
-
-  static void mockSchedulingBySchedulerRunner(final PendingTaskCollectionPointer pendingTaskCollectionPointer,
-                                              final SchedulingPolicy schedulingPolicy,
-                                              final JobStateManager jobStateManager,
-                                              final ExecutorRegistry executorRegistry,
-                                              final boolean scheduleOnlyTheFirstStage) {
-    final SchedulerRunner schedulerRunner =
-        new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
-    schedulerRunner.scheduleJob(jobStateManager);
-    schedulerRunner.doScheduleTaskList();
-  }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
similarity index 83%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 2f538ee..772c587 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -25,16 +26,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 /**
- * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
+ * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
-public final class SourceLocationAwareSchedulingPolicyTest {
+public final class SourceLocationAwareSchedulingConstraintTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
   private static final String SITE_2 = "BUSAN";
@@ -46,12 +49,12 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link Task} when
+   * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
    * there are no executors in appropriate location(s).
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() {
-    final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new SourceLocationAwareSchedulingConstraint();
 
     // Prepare test scenario
     final Task task = CreateTask.withReadablesWithSourceLocations(
@@ -59,16 +62,17 @@ public final class SourceLocationAwareSchedulingPolicyTest {
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
 
-    assertEquals(Collections.emptySet(),
-        schedulingPolicy.filterExecutorRepresenters(new HashSet<>(Arrays.asList(e0, e1)), task));
+    assertEquals(Collections.emptySet(), Arrays.asList(e0, e1).stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task))
+        .collect(Collectors.toSet()));
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should properly schedule TGs with multiple source locations.
+   * {@link SourceLocationAwareSchedulingConstraint} should properly schedule TGs with multiple source locations.
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() {
-    final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new SourceLocationAwareSchedulingConstraint();
     // Prepare test scenario
     final Task task0 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
@@ -83,8 +87,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
     for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
-      assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
-          new HashSet<>(Collections.singletonList(e)), task));
+      assertTrue(schedulingConstraint.testSchedulability(e, task));
     }
   }
 
@@ -103,6 +106,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
           readable));
       when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement()));
       when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap);
+      when(mockInstance.getPropertyValue(SourceLocationAwareSchedulingProperty.class)).thenReturn(Optional.of(true));
       return mockInstance;
     }
 
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
index f8c4f90..3c74fec 100644
--- a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.test.EmptyComponents;
 import org.apache.reef.tang.Injector;
@@ -63,12 +63,12 @@ public final class PhysicalPlanGeneratorTest {
     final Stage s0 = stages.next();
     final Stage s1 = stages.next();
 
-    assertNotEquals(s0.getScheduleGroupIndex(), s1.getScheduleGroupIndex());
+    assertNotEquals(s0.getScheduleGroup(), s1.getScheduleGroup());
   }
 
-  private static final IRVertex newIRVertex(final int scheduleGroupIndex, final int parallelism) {
+  private static final IRVertex newIRVertex(final int scheduleGroup, final int parallelism) {
     final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM);
-    irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+    irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
     irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
     return irVertex;
   }
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
index a60cf88..515bd95 100644
--- a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.test.EmptyComponents;
 import org.apache.reef.tang.Tang;
@@ -56,21 +56,21 @@ public final class StagePartitionerTest {
 
   /**
    * @param parallelism {@link ParallelismProperty} value for the new vertex
-   * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for the new vertex
+   * @param scheduleGroup {@link ScheduleGroupProperty} value for the new vertex
    * @param otherProperties other {@link VertexExecutionProperty} for the new vertex
    * @return new {@link IRVertex}
    */
-  private static IRVertex newVertex(final int parallelism, final int scheduleGroupIndex,
+  private static IRVertex newVertex(final int parallelism, final int scheduleGroup,
                                     final List<VertexExecutionProperty> otherProperties) {
     final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
     vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
-    vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+    vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
     otherProperties.forEach(property -> vertex.getExecutionProperties().put(property));
     return vertex;
   }
 
   /**
-   * A simple case where two vertices have common parallelism and ScheduleGroupIndex so that get merged into one stage.
+   * A simple case where two vertices have common parallelism and ScheduleGroup so that get merged into one stage.
    */
   @Test
   public void testLinear() {
@@ -101,10 +101,10 @@ public final class StagePartitionerTest {
   }
 
   /**
-   * A simple case where two vertices have different ScheduleGroupIndex.
+   * A simple case where two vertices have different ScheduleGroup.
    */
   @Test
-  public void testSplitByScheduleGroupIndex() {
+  public void testSplitByScheduleGroup() {
     final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
     final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
     final IRVertex v1 = newVertex(1, 1, Collections.emptyList());
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
new file mode 100644
index 0000000..443ffca
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link SchedulingConstraintRegistry}.
+ */
+public final class SchedulingConstraintnRegistryTest {
+  @Test
+  public void testSchedulingConstraintRegistry() throws InjectionException {
+    final SchedulingConstraintRegistry registry = Tang.Factory.getTang().newInjector()
+        .getInstance(SchedulingConstraintRegistry.class);
+    assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ExecutorSlotComplianceProperty.class, registry));
+    assertEquals(ContainerTypeAwareSchedulingConstraint.class,
+        getConstraintOf(ExecutorPlacementProperty.class, registry));
+    assertEquals(SourceLocationAwareSchedulingConstraint.class,
+        getConstraintOf(SourceLocationAwareSchedulingProperty.class, registry));
+  }
+
+  private static Class<? extends SchedulingConstraint> getConstraintOf(
+      final Class<? extends VertexExecutionProperty> property, final SchedulingConstraintRegistry registry) {
+    return registry.get(property)
+        .orElseThrow(() -> new RuntimeException(String.format(
+            "No SchedulingConstraint found for property %s", property)))
+        .getClass();
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index 5cf95b5..33e4988 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternPro
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
@@ -54,7 +54,7 @@ public final class DefaultScheduleGroupPassTest {
   @Test
   public void testAnnotatingPass() {
     final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass();
-    assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
+    assertEquals(ScheduleGroupProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
   }
 
   /**
@@ -67,11 +67,11 @@ public final class DefaultScheduleGroupPassTest {
         new TestPolicy(), "");
 
     for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
-      final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
-      final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
-          .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
+      final Integer currentScheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class).get();
+      final Integer largestScheduleGroupOfParent = processedDAG.getParents(irVertex.getId()).stream()
+          .mapToInt(v -> v.getPropertyValue(ScheduleGroupProperty.class).get())
           .max().orElse(0);
-      assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
+      assertTrue(currentScheduleGroup >= largestScheduleGroupOfParent);
     }
   }
 
@@ -155,31 +155,31 @@ public final class DefaultScheduleGroupPassTest {
   }
 
   /**
-   * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code expected}.
+   * Asserts that the {@link ScheduleGroupProperty} is equal to {@code expected}.
    * @param expected the expected property value
    * @param vertex the vertex to test
    */
-  private static void assertScheduleGroupIndex(final int expected, final IRVertex vertex) {
-    assertEquals(expected, getScheduleGroupIndex(vertex));
+  private static void assertScheduleGroup(final int expected, final IRVertex vertex) {
+    assertEquals(expected, getScheduleGroup(vertex));
   }
 
   /**
    * @param vertex a vertex
-   * @return {@link ScheduleGroupIndexProperty} of {@code vertex}
+   * @return {@link ScheduleGroupProperty} of {@code vertex}
    */
-  private static int getScheduleGroupIndex(final IRVertex vertex) {
-    return vertex.getPropertyValue(ScheduleGroupIndexProperty.class)
+  private static int getScheduleGroup(final IRVertex vertex) {
+    return vertex.getPropertyValue(ScheduleGroupProperty.class)
         .orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup not set for %s", vertex.getId())));
   }
 
   /**
-   * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupIndexProperty} value.
+   * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupProperty} value.
    * @param vertices vertices to test
    */
-  private static void assertDifferentScheduleGroupIndex(final Collection<IRVertex> vertices) {
+  private static void assertDifferentScheduleGroup(final Collection<IRVertex> vertices) {
     final Set<Integer> indices = new HashSet<>();
     vertices.forEach(v -> {
-      final int idx = getScheduleGroupIndex(v);
+      final int idx = getScheduleGroup(v);
       assertFalse(indices.contains(idx));
       indices.add(idx);
     });
@@ -194,7 +194,7 @@ public final class DefaultScheduleGroupPassTest {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
     pass.apply(dag.left());
-    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().forEach(v -> assertScheduleGroup(0, v));
   }
 
   /**
@@ -206,12 +206,12 @@ public final class DefaultScheduleGroupPassTest {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
     pass.apply(dag.left());
-    dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v));
-    dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v));
+    dag.right().subList(0, 4).forEach(v -> assertScheduleGroup(0, v));
+    dag.right().subList(4, 5).forEach(v -> assertScheduleGroup(1, v));
   }
 
   /**
-   * Test scenario to determine whether push edges properly enforces same scheduleGroupIndex or not.
+   * Test scenario to determine whether push edges properly enforces same scheduleGroup or not.
    */
   @Test
   public void testBranchWithPush() {
@@ -219,7 +219,7 @@ public final class DefaultScheduleGroupPassTest {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Push);
     pass.apply(dag.left());
-    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().forEach(v -> assertScheduleGroup(0, v));
   }
 
   /**
@@ -230,7 +230,7 @@ public final class DefaultScheduleGroupPassTest {
     final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, true, true);
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, DataFlowModelProperty.Value.Pull);
-    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+    assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices());
   }
 
   /**
@@ -241,7 +241,7 @@ public final class DefaultScheduleGroupPassTest {
     final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, false, true);
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Pull);
-    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+    assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices());
   }
 
   /**
@@ -253,11 +253,11 @@ public final class DefaultScheduleGroupPassTest {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
     pass.apply(dag.left());
-    final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
-    final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
-    dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
-    dag.right().subList(2, 4).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
-    dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v));
+    final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> assertScheduleGroup(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 4).forEach(v -> assertScheduleGroup(idxForSecondScheduleGroup, v));
+    dag.right().subList(4, 6).forEach(v -> assertScheduleGroup(2, v));
   }
 
   /**
@@ -269,7 +269,7 @@ public final class DefaultScheduleGroupPassTest {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
     pass.apply(dag.left());
-    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().forEach(v -> assertScheduleGroup(0, v));
   }
 
   /**
@@ -283,9 +283,9 @@ public final class DefaultScheduleGroupPassTest {
     dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
         .getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     pass.apply(dag.left());
-    final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
-    final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
-    dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
-    dag.right().subList(2, 6).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+    final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> assertScheduleGroup(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 6).forEach(v -> assertScheduleGroup(idxForSecondScheduleGroup, v));
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index b1ea8f8..41edef3 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -28,21 +28,21 @@ public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(14, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(16, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(16, padoPolicy.getCompileTimePasses().size());
+    assertEquals(18, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(18, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(20, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }