You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/06/25 03:17:13 UTC

[incubator-nemo] branch master updated: [NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new add2b55  [NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51)
add2b55 is described below

commit add2b555a57632761fbc5657c1215fd684b37760
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Mon Jun 25 12:17:10 2018 +0900

    [NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51)
    
    JIRA: [NEMO-102: Stage Partitioning by PhysicalPlanGenerator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)
    
    **Major changes:**
    - Removed StageId property
    - Replaced DefaultStagePartitioningPass with StagePartitioner in nemo-runtime-common
    - Modified PhysicalPlanGenerator to use StagePartitioner
    - Added Stage-level property. Common properties which vertices in a stage share become stage-level properties. (Except for the properties ignored by StagePartitioner)
    - Ad-hoc properties in Task and Stage, such as containerType, now can be handled by stage-level properties.
    - Replaced ScheduleGroupPass with DefaultScheduleGroupPass, which does not require StageId property for assigning ScheduleGroupIndex
    
    **Minor changes to note:**
    - Removed StageBuilder and StageEdgeBuilder
    - Add a feature to visualizer to display stage-level ExecutionProperties
    - Modified visualizer to properly display other ExecutionProperties
    - Modified visualizer to properly draw StageEdges so that those edges are not cut by stage boundaries
    - Removed parallelism equality checking by DAGBuilder which requires IR-level StageId property (the feature is replaced by the constructor of Stage and sanity checking by PhysicalPlanGenerator)
    - Renamed DataStoreProperty to InterTaskDataStoreProperty because it only controls data flow which spans through differnet stages
    - Modified ExecutionPropertyMap#toString to emit canonical name of property key
    - Added equality test cases to ExecutionPropertyMapTest
    - Removed `idToIRVertex` parameter from the constructor of PhysicalPlan. (The parameter value can be inferred from the IR dag supplied to the constructor.)
    - Increased the capacity of resources in `beam_sample_executor_resources.json`, because some integration test cases require scheduling a large ScheduleGroup at once. (For example, ScheduleGroup 0 in AlternatingLeastSquareITCase_pado requires 15 slots in Transient resoruce.)
    - Modified StageEdge to use consistent naming for executionProperties when emitting json (edgeProperties &rarr; executionProperties)
    
    **Tests for the changes:**
    - Added StagePartitionerTest to test StagePartitioner under various test scenarios.
    - Renamed ScheduleGroupPassTest to DefaultScheduleGroupPassTest and made it tests DefaultScheduleGroupPass under various test scenarios.
    - Existing tests should cover changes on PhysicalPlanGenerator.
    
    **Other comments:**
    - Legacy ScheduleGroupPass forces stages with SourceVertex within to have ScheduleGroupIndex of zero. Since DefaultScheduleGroupPass does not employ this kind of trick, in FaultToleranceTest ScheduleGroup 0 for `TestPlanGenerator.PlanType.TwoVerticesJoined` is splitted into two ScheduleGroups. That's why I made modification like `if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {` in FaultToleranceTest.
    
    resolves [NEMO-102](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)
---
 bin/json2dot.py                                    |  99 +++----
 .../snu/nemo/common/coder/BytesDecoderFactory.java |   2 +-
 .../edu/snu/nemo/common/coder/DecoderFactory.java  |   1 +
 .../src/main/java/edu/snu/nemo/common/dag/DAG.java |   2 +-
 .../java/edu/snu/nemo/common/dag/DAGBuilder.java   |  18 --
 ...operty.java => InterTaskDataStoreProperty.java} |   8 +-
 .../ir/executionproperty/ExecutionPropertyMap.java |  33 +--
 ...tageIdProperty.java => SkipSerDesProperty.java} |  20 +-
 .../ExecutionPropertyMapTest.java                  |  34 ++-
 .../nemo/compiler/backend/nemo/NemoBackend.java    |   3 +-
 .../compiletime/annotating/CompressionPass.java    |   3 -
 .../annotating/DataSkewEdgeDataStorePass.java      |   8 +-
 .../compiletime/annotating/DecompressionPass.java  |   3 -
 .../DefaultEdgeUsedDataHandlingPass.java           |  11 +-
 ...ass.java => DefaultInterTaskDataStorePass.java} |  18 +-
 .../annotating/DefaultScheduleGroupPass.java       | 291 +++++++++++++++++++++
 .../annotating/DefaultStagePartitioningPass.java   | 144 ----------
 .../DisaggregationEdgeDataStorePass.java           |   9 +-
 .../annotating/PadoEdgeDataStorePass.java          |  10 +-
 .../annotating/SailfishEdgeDataStorePass.java      |   9 +-
 .../compiletime/annotating/ScheduleGroupPass.java  | 145 ----------
 .../composite/PrimitiveCompositePass.java          |   7 +-
 .../reshaping/SailfishRelayReshapingPass.java      |   2 +
 .../compiler/optimizer/policy/BasicPullPolicy.java |   6 +-
 .../compiler/optimizer/policy/BasicPushPolicy.java |   6 +-
 .../policy/DefaultPolicyWithSeparatePass.java      |  10 +-
 .../compiler/optimizer/policy/PolicyBuilder.java   |   4 +-
 .../resources/beam_sample_executor_resources.json  |   4 +-
 .../pass/runtime/DataSkewRuntimePass.java          |   2 +-
 .../snu/nemo/runtime/common/plan/PhysicalPlan.java |  13 +-
 .../runtime/common/plan/PhysicalPlanGenerator.java | 165 +++++-------
 .../edu/snu/nemo/runtime/common/plan/Stage.java    |  48 ++--
 .../snu/nemo/runtime/common/plan/StageBuilder.java | 100 -------
 .../snu/nemo/runtime/common/plan/StageEdge.java    |   6 +-
 .../nemo/runtime/common/plan/StageEdgeBuilder.java | 114 --------
 .../nemo/runtime/common/plan/StagePartitioner.java | 125 +++++++++
 .../edu/snu/nemo/runtime/common/plan/Task.java     |  29 +-
 .../runtime/executor/data/BlockManagerWorker.java  |  48 ++--
 .../runtime/executor/datatransfer/InputReader.java |  11 +-
 .../executor/datatransfer/OutputWriter.java        |   4 +-
 .../executor/datatransfer/DataTransferTest.java    |  55 ++--
 .../runtime/executor/task/TaskExecutorTest.java    |  16 +-
 .../master/scheduler/BatchSingleJobScheduler.java  |   2 +-
 .../ContainerTypeAwareSchedulingPolicy.java        |   6 +-
 .../master/scheduler/SingleJobTaskCollection.java  |  21 +-
 .../nemo/runtime/master/JobStateManagerTest.java   |   2 +-
 .../ContainerTypeAwareSchedulingPolicyTest.java    |   6 +-
 .../master/scheduler/FaultToleranceTest.java       |  35 +--
 .../master/scheduler/SingleTaskQueueTest.java      |   2 +-
 .../runtime/plangenerator/TestPlanGenerator.java   |   2 +-
 .../runtime/common/plan/StagePartitionerTest.java  | 185 +++++++++++++
 .../snu/nemo/tests/client/ClientEndpointTest.java  |   2 +-
 .../annotating/DefaultScheduleGroupPassTest.java   | 291 +++++++++++++++++++++
 .../annotating/ScheduleGroupPassTest.java          |  70 -----
 .../composite/DisaggregationPassTest.java          |  28 +-
 .../composite/PadoCompositePassTest.java           |  12 +-
 .../compiletime/composite/SailfishPassTest.java    |   8 +-
 .../optimizer/policy/PolicyBuilderTest.java        |  12 +-
 .../compiler/optimizer/policy/TestPolicy.java      |   3 +-
 .../runtime/common/plan/DAGConverterTest.java      |  18 +-
 60 files changed, 1308 insertions(+), 1043 deletions(-)

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 6f2d339..a0262ef 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -26,9 +26,8 @@ import re
 
 nextIdx = 0
 
-def edgePropertiesString(properties):
-    prop = {p[0]: p[1] for p in properties.items() if p[0] != 'Coder'}
-    return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(prop.items())])
+def propertiesToString(properties):
+    return '<BR/>'.join(['{}={}'.format(re.sub('Property$', '', item[0].split('.')[-1]), item[1]) for item in sorted(properties.items())])
 
 def getIdx():
     global nextIdx
@@ -119,10 +118,6 @@ def Vertex(id, properties, state):
     except:
         pass
     try:
-        return Stage(id, properties)
-    except:
-        pass
-    try:
         return LoopVertex(id, properties)
     except:
         pass
@@ -138,33 +133,18 @@ class NormalVertex:
     def dot(self):
         color = 'black'
         try:
-            if (self.properties['executionProperties']['ExecutorPlacement'] == 'Transient'):
+            placement = self.properties['executionProperties']['edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty']
+            if (placement == 'Transient'):
                 color = 'orange'
-            if (self.properties['executionProperties']['ExecutorPlacement'] == 'Reserved'):
+            if (placement == 'Reserved'):
                 color = 'green'
         except:
             pass
         label = self.id
         if self.state is not None:
-            label += '\\n({})'.format(self.state)
-        try:
-            label += ' (p{})'.format(self.properties['executionProperties']['Parallelism'])
-        except:
-            pass
-        try:
-            label += ' (s{})'.format(self.properties['executionProperties']['ScheduleGroupIndex'])
-        except:
-            pass
+            label += '<BR/>({})'.format(self.state)
         try:
-            label += '\\n{}'.format(self.properties['source'])
-        except:
-            pass
-        try:
-            label += '\\n{}'.format(self.properties['runtimeVertexId'])
-        except:
-            pass
-        try:
-            label += '\\n{}'.format(self.properties['index'])
+            label += '<BR/>{}'.format(self.properties['source'])
         except:
             pass
         try:
@@ -174,15 +154,19 @@ class NormalVertex:
                 class_name = transform[1].split('{')[0].split('.')[-1].split('$')[0].split('@')[0]
             except IndexError:
                 class_name = '?'
-            label += '\\n{}:{}'.format(transform_name, class_name)
+            label += '<BR/>{}:{}'.format(transform_name, class_name)
         except:
             pass
         if ('class' in self.properties and self.properties['class'] == 'MetricCollectionBarrierVertex'):
             shape = ', shape=box'
-            label += '\\nMetricCollectionBarrier'
+            label += '<BR/>MetricCollectionBarrier'
         else:
             shape = ''
-        dot = '{} [label="{}", color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color, stateToColor(self.state), shape)
+        try:
+            label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.properties['executionProperties']))
+        except:
+            pass
+        dot = '{} [label=<{}>, color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color, stateToColor(self.state), shape)
         return dot
     @property
     def oneVertex(self):
@@ -191,27 +175,6 @@ class NormalVertex:
     def logicalEnd(self):
         return self.idx
 
-class Stage:
-    def __init__(self, id, properties):
-        self.id = id
-        self.internalDAG = DAG(properties['stageInternalDAG'], JobState.empty())
-        self.idx = getIdx()
-    @property
-    def dot(self):
-        dot = ''
-        dot += 'subgraph cluster_{} {{'.format(self.idx)
-        dot += 'label = "{}";'.format(self.id)
-        dot += 'color=blue;'
-        dot += self.internalDAG.dot
-        dot += '}'
-        return dot
-    @property
-    def oneVertex(self):
-        return next(iter(self.internalDAG.vertices.values())).oneVertex
-    @property
-    def logicalEnd(self):
-        return 'cluster_{}'.format(self.idx)
-
 class LoopVertex:
     def __init__(self, id, properties):
         self.id = id
@@ -226,10 +189,10 @@ class LoopVertex:
     def dot(self):
         label = self.id
         try:
-            label += ' (p{})'.format(self.executionProperties['Parallelism'])
+            label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
         except:
             pass
-        label += '\\n(Remaining iteration: {})'.format(self.remaining_iteration)
+        label += '<BR/>(Remaining iteration: {})'.format(self.remaining_iteration)
         dot = 'subgraph cluster_{} {{'.format(self.idx)
         dot += 'label = "{}";'.format(label)
         dot += self.dag.dot
@@ -253,24 +216,30 @@ class LoopVertex:
 class Stage:
     def __init__(self, id, properties, state):
         self.id = id
-        self.irVertex = DAG(properties['irDag'], JobState.empty())
+        self.properties = properties
+        self.stageDAG = DAG(properties['irDag'], JobState.empty())
         self.idx = getIdx()
         self.state = state
+        self.executionProperties = self.properties['executionProperties']
     @property
     def dot(self):
         if self.state.state is None:
             state = ''
         else:
             state = ' ({})'.format(self.state.state)
+        label = '{}{}'.format(self.id, state)
+        if self.state.tasks:
+            label += '<BR/><BR/>{} Task(s):<BR/>{}'.format(len(self.state.tasks), self.state.taskStateSummary)
+        label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
         dot = 'subgraph cluster_{} {{'.format(self.idx)
-        dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
+        dot += 'label = <{}>;'.format(label)
         dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
-        dot += self.irVertex.dot
+        dot += self.stageDAG.dot
         dot += '}'
         return dot
     @property
     def oneVertex(self):
-        return next(iter(self.irVertex.vertices.values())).oneVertex
+        return next(iter(self.stageDAG.vertices.values())).oneVertex
     @property
     def logicalEnd(self):
         return 'cluster_{}'.format(self.idx)
@@ -305,8 +274,6 @@ class IREdge:
         self.dst = dst
         self.id = properties['id']
         self.executionProperties = properties['executionProperties']
-        self.encoderFactory = self.executionProperties['Encoder']
-        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
         src = self.src
@@ -319,21 +286,19 @@ class IREdge:
             dst = dst.internalDstFor(self.id)
         except:
             pass
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+        label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.id, propertiesToString(self.executionProperties))
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
 class StageEdge:
     def __init__(self, src, dst, properties):
-        self.src = src.internalDAG.vertices[properties['srcVertex']]
-        self.dst = dst.internalDAG.vertices[properties['dstVertex']]
+        self.src = src.stageDAG.vertices[properties['externalSrcVertexId']]
+        self.dst = dst.stageDAG.vertices[properties['externalDstVertexId']]
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.encoderFactory = self.executionProperties['Encoder']
-        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+        label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId, propertiesToString(self.executionProperties))
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
@@ -343,11 +308,9 @@ class RuntimeEdge:
         self.dst = dst
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.encoderFactory = self.executionProperties['Encoder']
-        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+        label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId, propertiesToString(self.executionProperties))
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
index 069e254..e62e0b8 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
@@ -75,7 +75,7 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
 
       final int lengthToRead = byteOutputStream.getCount();
       if (lengthToRead == 0) {
-        throw new IOException("EoF (empty partition)!"); // TODO #?: use EOF exception instead of IOException.
+        throw new IOException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
       }
       final byte[] resultBytes = new byte[lengthToRead]; // Read the size of this byte array.
       System.arraycopy(byteOutputStream.getBufDirectly(), 0, resultBytes, 0, lengthToRead);
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index a93d843..dc67ff3 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
  *
  * @param <T> element type.
  */
+// TODO #120: Separate EOFException from Decoder Failures
 public interface DecoderFactory<T> extends Serializable {
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index 1366ecc..46198f0 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -204,7 +204,7 @@ public final class DAG<V extends Vertex, E extends Edge<V>> implements Serializa
   /**
    * Indicates the traversal order of this DAG.
    */
-  public enum TraversalOrder {
+  private enum TraversalOrder {
     PreOrder,
     PostOrder
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index ed051f7..35c4330 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -23,8 +23,6 @@ import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.io.Serializable;
 import java.util.*;
@@ -259,22 +257,6 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
           throw new RuntimeException("DAG execution property check: "
               + "DataSizeMetricCollection edge is not compatible with push" + e.getId());
         }));
-    // All vertices with same Stage Id should have identical Parallelism execution property.
-    final HashMap<Integer, Integer> stageIdToParallelismMap = new HashMap<>();
-    vertices.stream().filter(v -> v instanceof IRVertex)
-        .map(v -> (IRVertex) v)
-        .forEach(v -> {
-          final Optional<Integer> stageId = v.getPropertyValue(StageIdProperty.class);
-          if (stageId.isPresent()) {
-            if (!stageIdToParallelismMap.containsKey(stageId.get())) {
-              stageIdToParallelismMap.put(stageId.get(), v.getPropertyValue(ParallelismProperty.class).get());
-            } else if (!stageIdToParallelismMap.get(stageId.get())
-                .equals(v.getPropertyValue(ParallelismProperty.class).get())) {
-              throw new RuntimeException("DAG execution property check: vertices are in a same stage, "
-                  + "but has different parallelism execution properties: Stage" + stageId.get() + ": " + v.getId());
-            }
-          }
-        });
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
similarity index 81%
rename from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
index 6e87844..af6c85c 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
@@ -20,12 +20,12 @@ import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 /**
  * DataStore ExecutionProperty.
  */
-public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProperty.Value> {
+public final class InterTaskDataStoreProperty extends EdgeExecutionProperty<InterTaskDataStoreProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private DataStoreProperty(final Value value) {
+  private InterTaskDataStoreProperty(final Value value) {
     super(value);
   }
 
@@ -34,8 +34,8 @@ public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProp
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static DataStoreProperty of(final Value value) {
-    return new DataStoreProperty(value);
+  public static InterTaskDataStoreProperty of(final Value value) {
+    return new InterTaskDataStoreProperty(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index c4b297b..fdbd5dc 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.common.ir.executionproperty;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -25,7 +25,6 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -33,6 +32,7 @@ import java.io.Serializable;
 import java.util.*;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * ExecutionPropertyMap Class, which uses HashMap for keeping track of ExecutionProperties for vertices and edges.
@@ -67,22 +67,23 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
     switch (commPattern) {
       case Shuffle:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
         break;
       case BroadCast:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
         break;
       case OneToOne:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
         break;
       default:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
     }
     return map;
   }
+
   /**
    * Static initializer for irVertex.
    * @param irVertex irVertex to keep the execution property of.
@@ -148,6 +149,13 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
     properties.values().forEach(action);
   }
 
+  /**
+   * @return {@link Stream} of execution properties.
+   */
+  public Stream<T> stream() {
+    return properties.values().stream();
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
@@ -159,7 +167,7 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
       }
       isFirstPair = false;
       sb.append("\"");
-      sb.append(entry.getKey());
+      sb.append(entry.getKey().getCanonicalName());
       sb.append("\": \"");
       sb.append(entry.getValue().getValue());
       sb.append("\"");
@@ -174,17 +182,12 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
     if (this == obj) {
       return true;
     }
-
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-
-    ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
-
-    return new EqualsBuilder()
-        .append(properties.values().stream().collect(Collectors.toSet()),
-            that.properties.values().stream().collect(Collectors.toSet()))
-        .isEquals();
+    final ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
+    return properties.values().stream().collect(Collectors.toSet())
+        .equals(that.properties.values().stream().collect(Collectors.toSet()));
   }
 
   @Override
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
similarity index 61%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
index 0900992..f9668ed 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
@@ -18,23 +18,25 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
- * StageId ExecutionProperty.
+ * Attaching this property makes runtime to skip serialization and deserialization for the vertex input and output.
+ * TODO #118: Implement Skipping (De)Serialization by ExecutionProperty
  */
-public final class StageIdProperty extends VertexExecutionProperty<Integer> {
+public final class SkipSerDesProperty extends VertexExecutionProperty<Boolean> {
+
+  private static final SkipSerDesProperty SKIP_SER_DES_PROPERTY = new SkipSerDesProperty();
+
   /**
    * Constructor.
-   * @param value value of the execution property.
    */
-  private StageIdProperty(final Integer value) {
-    super(value);
+  private SkipSerDesProperty() {
+    super(true);
   }
 
   /**
    * Static method exposing the constructor.
-   * @param value value of the new execution property.
-   * @return the newly created execution property.
+   * @return the execution property.
    */
-  public static StageIdProperty of(final Integer value) {
-    return new StageIdProperty(value);
+  public static SkipSerDesProperty of() {
+    return SKIP_SER_DES_PROPERTY;
   }
 }
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 1ca18ff..ed27235 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test {@link ExecutionPropertyMap}.
@@ -57,8 +58,8 @@ public class ExecutionPropertyMapTest {
 
   @Test
   public void testPutGetAndRemove() {
-    edgeMap.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get());
+    edgeMap.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
+    assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, edgeMap.get(InterTaskDataStoreProperty.class).get());
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get());
     edgeMap.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
@@ -72,4 +73,33 @@ public class ExecutionPropertyMapTest {
     vertexMap.put(ParallelismProperty.of(100));
     assertEquals(100, vertexMap.get(ParallelismProperty.class).get().longValue());
   }
+
+  @Test
+  public void testEquality() {
+    final ExecutionPropertyMap<ExecutionProperty> map0 = new ExecutionPropertyMap<>("map0");
+    final ExecutionPropertyMap<ExecutionProperty> map1 = new ExecutionPropertyMap<>("map1");
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+    map0.put(ParallelismProperty.of(1));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map1.put(ParallelismProperty.of(1));
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+    map1.put(ParallelismProperty.of(2));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map0.put(ParallelismProperty.of(2));
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+    map0.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map1.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map1.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+  }
 }
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 71626de..981ee72 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -58,8 +58,7 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
-    final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
-        stageDAG, physicalPlanGenerator.getIdToIRVertex());
+    final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
     return physicalPlan;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 0ffddaa..0c84203 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -19,7 +19,6 @@ import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 
 /**
@@ -48,8 +47,6 @@ public final class CompressionPass extends AnnotatingPass {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
-        .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
-            .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
         .filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
         .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
index a0428a6..857069b 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
@@ -17,9 +17,9 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 /**
  * Pass to annotate the DAG for a job to perform data skew.
@@ -31,7 +31,7 @@ public final class DataSkewEdgeDataStorePass extends AnnotatingPass {
    * Default constructor.
    */
   public DataSkewEdgeDataStorePass() {
-    super(DataStoreProperty.class);
+    super(InterTaskDataStoreProperty.class);
   }
 
   @Override
@@ -45,9 +45,9 @@ public final class DataSkewEdgeDataStorePass extends AnnotatingPass {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           // we want it to be in the same stage
           if (edge.equals(edgeToUseMemory)) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
           } else {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
index 6ec2fde..6283071 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -20,7 +20,6 @@ import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 
 /**
@@ -39,8 +38,6 @@ public final class DecompressionPass extends AnnotatingPass {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
-        .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
-            .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
         // Find edges which have a compression property but not decompression property.
         .filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
             && !edge.getPropertyValue(DecompressionProperty.class).isPresent())
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
index 81e16f7..0ea2d13 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
@@ -32,7 +32,7 @@ public final class DefaultEdgeUsedDataHandlingPass extends AnnotatingPass {
    * Default constructor.
    */
   public DefaultEdgeUsedDataHandlingPass() {
-    super(UsedDataHandlingProperty.class, Collections.singleton(DataStoreProperty.class));
+    super(UsedDataHandlingProperty.class, Collections.singleton(InterTaskDataStoreProperty.class));
   }
 
   @Override
@@ -40,9 +40,10 @@ public final class DefaultEdgeUsedDataHandlingPass extends AnnotatingPass {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           if (!irEdge.getPropertyValue(UsedDataHandlingProperty.class).isPresent()) {
-            final DataStoreProperty.Value dataStoreValue = irEdge.getPropertyValue(DataStoreProperty.class).get();
-            if (DataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
-                || DataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
+            final InterTaskDataStoreProperty.Value dataStoreValue
+                = irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get();
+            if (InterTaskDataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
+                || InterTaskDataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
               irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Discard));
             } else {
               irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
similarity index 61%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
index 35c8c5a..98bca9a 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
@@ -17,9 +17,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.util.Collections;
 import java.util.List;
@@ -27,12 +26,12 @@ import java.util.List;
 /**
  * Edge data store pass to process inter-stage memory store edges.
  */
-public final class ReviseInterStageEdgeDataStorePass extends AnnotatingPass {
+public final class DefaultInterTaskDataStorePass extends AnnotatingPass {
   /**
    * Default constructor.
    */
-  public ReviseInterStageEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(StageIdProperty.class));
+  public DefaultInterTaskDataStorePass() {
+    super(InterTaskDataStoreProperty.class, Collections.emptySet());
   }
 
   @Override
@@ -40,13 +39,8 @@ public final class ReviseInterStageEdgeDataStorePass extends AnnotatingPass {
     dag.getVertices().forEach(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
-        inEdges.forEach(edge -> {
-          if (DataStoreProperty.Value.MemoryStore.equals(edge.getPropertyValue(DataStoreProperty.class).get())
-              && !edge.getSrc().getPropertyValue(StageIdProperty.class).get()
-              .equals(edge.getDst().getPropertyValue(StageIdProperty.class).get())) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
-          }
-        });
+        inEdges.forEach(edge -> edge.setProperty(
+            InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore)));
       }
     });
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
new file mode 100644
index 0000000..55c16c5
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.dag.Edge;
+import edu.snu.nemo.common.dag.Vertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A pass for assigning each stages in schedule groups.
+ *
+ * <h3>Rules</h3>
+ * <ul>
+ *   <li>Vertices connected with push edges must be assigned same ScheduleGroup.</li>
+ *   <li>For pull edges,
+ *     <ul>
+ *       <li>if the destination of the edge depends on multiple ScheduleGroups, split ScheduleGroup by the edge.</li>
+ *       <li>if the edge is broadcast type and {@code allowBroadcastWithinScheduleGroup} is {@code false},
+ *       split ScheduleGroup by the edge.</li>
+ *       <li>if the edge is shuffle type and {@code allowShuffleWithinScheduleGroup} is {@code false},
+ *       split ScheduleGroup by the edge.</li>
+ *       <li>if the destination of the edge has multiple inEdges, split ScheduleGroup by the edge.</li>
+ *       <li>Otherwise, the source and the destination of the edge should be assigned same ScheduleGroup.</li>
+ *     </ul>
+ *   </li>
+ * </ul>
+ */
+public final class DefaultScheduleGroupPass extends AnnotatingPass {
+
+  private final boolean allowBroadcastWithinScheduleGroup;
+  private final boolean allowShuffleWithinScheduleGroup;
+  private final boolean allowMultipleInEdgesWithinScheduleGroup;
+
+  /**
+   * Default constructor.
+   */
+  public DefaultScheduleGroupPass() {
+    this(false, false, true);
+  }
+
+  /**
+   * Constructor.
+   * @param allowBroadcastWithinScheduleGroup whether to allow Broadcast edges within a ScheduleGroup or not
+   * @param allowShuffleWithinScheduleGroup whether to allow Shuffle edges within a ScheduleGroup or not
+   * @param allowMultipleInEdgesWithinScheduleGroup whether to allow vertices with multiple dependencies or not
+   */
+  public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup,
+                                  final boolean allowShuffleWithinScheduleGroup,
+                                  final boolean allowMultipleInEdgesWithinScheduleGroup) {
+    super(ScheduleGroupIndexProperty.class, Stream.of(
+        DataCommunicationPatternProperty.class,
+        DataFlowModelProperty.class
+    ).collect(Collectors.toSet()));
+    this.allowBroadcastWithinScheduleGroup = allowBroadcastWithinScheduleGroup;
+    this.allowShuffleWithinScheduleGroup = allowShuffleWithinScheduleGroup;
+    this.allowMultipleInEdgesWithinScheduleGroup = allowMultipleInEdgesWithinScheduleGroup;
+  }
+
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    final Map<IRVertex, ScheduleGroup> irVertexToScheduleGroupMap = new HashMap<>();
+    final Set<ScheduleGroup> scheduleGroups = new HashSet<>();
+    dag.topologicalDo(irVertex -> {
+      // Base case: for root vertices
+      if (!irVertexToScheduleGroupMap.containsKey(irVertex)) {
+        final ScheduleGroup newScheduleGroup = new ScheduleGroup();
+        scheduleGroups.add(newScheduleGroup);
+        newScheduleGroup.vertices.add(irVertex);
+        irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup);
+      }
+      // Get scheduleGroupIndex
+      final ScheduleGroup scheduleGroup = irVertexToScheduleGroupMap.get(irVertex);
+      if (scheduleGroup == null) {
+        throw new RuntimeException(String.format("ScheduleGroup must be set for %s", irVertex));
+      }
+      // Step case: inductively assign ScheduleGroup
+      for (final IREdge edge : dag.getOutgoingEdgesOf(irVertex)) {
+        final IRVertex connectedIRVertex = edge.getDst();
+        // Skip if some vertices that connectedIRVertex depends on do not have assigned a ScheduleGroup
+        boolean skip = false;
+        for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+          if (!irVertexToScheduleGroupMap.containsKey(edgeToConnectedIRVertex.getSrc())) {
+            // connectedIRVertex will be covered when edgeToConnectedIRVertex.getSrc() is visited
+            skip = true;
+            break;
+          }
+        }
+        if (skip) {
+          continue;
+        }
+        // Now we can assure that all vertices that connectedIRVertex depends on have assigned a ScheduleGroup
+
+        // Get ScheduleGroup(s) that push data to the connectedIRVertex
+        final Set<ScheduleGroup> pushScheduleGroups = new HashSet<>();
+        for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+          if (edgeToConnectedIRVertex.getPropertyValue(DataFlowModelProperty.class)
+              .orElseThrow(() -> new RuntimeException(String.format("DataFlowModelProperty for %s must be set",
+                  edgeToConnectedIRVertex.getId()))) == DataFlowModelProperty.Value.Push) {
+            pushScheduleGroups.add(irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc()));
+          }
+        }
+        if (pushScheduleGroups.isEmpty()) {
+          // If allowMultipleInEdgesWithinScheduleGroup is false and connectedIRVertex depends on multiple vertices,
+          // it should be a member of new ScheduleGroup
+          boolean mergability = allowMultipleInEdgesWithinScheduleGroup
+              || dag.getIncomingEdgesOf(connectedIRVertex).size() <= 1;
+          for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+            if (!mergability) {
+              break;
+            }
+            final ScheduleGroup anotherDependency = irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc());
+            if (!scheduleGroup.equals(anotherDependency)) {
+              // Since connectedIRVertex depends on multiple ScheduleGroups, connectedIRVertex must be a member of
+              // new ScheduleGroup
+              mergability = false;
+            }
+            final DataCommunicationPatternProperty.Value communicationPattern = edgeToConnectedIRVertex
+                .getPropertyValue(DataCommunicationPatternProperty.class).orElseThrow(
+                    () -> new RuntimeException(String.format("DataCommunicationPatternProperty for %s must be set",
+                        edgeToConnectedIRVertex.getId())));
+            if (!allowBroadcastWithinScheduleGroup
+                && communicationPattern == DataCommunicationPatternProperty.Value.BroadCast) {
+              mergability = false;
+            }
+            if (!allowShuffleWithinScheduleGroup
+                && communicationPattern == DataCommunicationPatternProperty.Value.Shuffle) {
+              mergability = false;
+            }
+          }
+
+          if (mergability) {
+            // Merge into the existing scheduleGroup
+            scheduleGroup.vertices.add(connectedIRVertex);
+            irVertexToScheduleGroupMap.put(connectedIRVertex, scheduleGroup);
+          } else {
+            // Create a new ScheduleGroup
+            final ScheduleGroup newScheduleGroup = new ScheduleGroup();
+            scheduleGroups.add(newScheduleGroup);
+            newScheduleGroup.vertices.add(connectedIRVertex);
+            irVertexToScheduleGroupMap.put(connectedIRVertex, newScheduleGroup);
+            for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+              final ScheduleGroup src = irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc());
+              final ScheduleGroup dst = newScheduleGroup;
+              src.scheduleGroupsTo.add(dst);
+              dst.scheduleGroupsFrom.add(src);
+            }
+          }
+        } else {
+          // If there are multiple ScheduleGroups that push data to connectedIRVertex, merge them
+          final Iterator<ScheduleGroup> pushScheduleGroupIterator = pushScheduleGroups.iterator();
+          final ScheduleGroup pushScheduleGroup = pushScheduleGroupIterator.next();
+          while (pushScheduleGroupIterator.hasNext()) {
+            final ScheduleGroup anotherPushScheduleGroup = pushScheduleGroupIterator.next();
+            anotherPushScheduleGroup.vertices.forEach(pushScheduleGroup.vertices::add);
+            scheduleGroups.remove(anotherPushScheduleGroup);
+            for (final ScheduleGroup src : anotherPushScheduleGroup.scheduleGroupsFrom) {
+              final ScheduleGroup dst = anotherPushScheduleGroup;
+              final ScheduleGroup newDst = pushScheduleGroup;
+              src.scheduleGroupsTo.remove(dst);
+              src.scheduleGroupsTo.add(newDst);
+              newDst.scheduleGroupsFrom.add(src);
+            }
+            for (final ScheduleGroup dst : anotherPushScheduleGroup.scheduleGroupsTo) {
+              final ScheduleGroup src = anotherPushScheduleGroup;
+              final ScheduleGroup newSrc = pushScheduleGroup;
+              dst.scheduleGroupsFrom.remove(src);
+              dst.scheduleGroupsFrom.add(newSrc);
+              newSrc.scheduleGroupsTo.add(dst);
+            }
+          }
+          // Add connectedIRVertex into the merged pushScheduleGroup
+          pushScheduleGroup.vertices.add(connectedIRVertex);
+          irVertexToScheduleGroupMap.put(connectedIRVertex, pushScheduleGroup);
+        }
+      }
+    });
+
+    // Assign ScheduleGroupIndex property based on topology of ScheduleGroups
+    final MutableInt currentScheduleGroupIndex = new MutableInt(getNextScheudleGroupIndex(dag.getVertices()));
+    final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder = new DAGBuilder<>();
+    scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex);
+    scheduleGroups.forEach(src -> src.scheduleGroupsTo
+        .forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new ScheduleGroupEdge(src, dst))));
+    scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> {
+      boolean usedCurrentIndex = false;
+      for (final IRVertex irVertex : scheduleGroup.vertices) {
+        if (!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) {
+          irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue()));
+          usedCurrentIndex = true;
+        }
+      }
+      if (usedCurrentIndex) {
+        currentScheduleGroupIndex.increment();
+      }
+    });
+    return dag;
+  }
+
+  /**
+   * Determines the range of {@link ScheduleGroupIndexProperty} value that will prevent collision
+   * with the existing {@link ScheduleGroupIndexProperty}.
+   * @param irVertexCollection collection of {@link IRVertex}
+   * @return the minimum value for the {@link ScheduleGroupIndexProperty} that won't collide with the existing values
+   */
+  private int getNextScheudleGroupIndex(final Collection<IRVertex> irVertexCollection) {
+    int nextScheduleGroupIndex = 0;
+    for (final IRVertex irVertex : irVertexCollection) {
+      final Optional<Integer> scheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class);
+      if (scheduleGroupIndex.isPresent()) {
+        nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, nextScheduleGroupIndex);
+      }
+    }
+    return nextScheduleGroupIndex;
+  }
+
+  /**
+   * Vertex in ScheduleGroup DAG.
+   */
+  private static final class ScheduleGroup extends Vertex {
+    private static int nextScheduleGroupId = 0;
+    private final Set<IRVertex> vertices = new HashSet<>();
+    private final Set<ScheduleGroup> scheduleGroupsTo = new HashSet<>();
+    private final Set<ScheduleGroup> scheduleGroupsFrom = new HashSet<>();
+
+    /**
+     * Constructor.
+     */
+    ScheduleGroup() {
+      super(String.format("ScheduleGroup-%d", nextScheduleGroupId++));
+    }
+  }
+
+  /**
+   * Edge in ScheduleGroup DAG.
+   */
+  private static final class ScheduleGroupEdge extends Edge<ScheduleGroup> {
+    private static int nextScheduleGroupEdgeId = 0;
+
+    /**
+     * Constructor.
+     *
+     * @param src source vertex.
+     * @param dst destination vertex.
+     */
+    ScheduleGroupEdge(final ScheduleGroup src, final ScheduleGroup dst) {
+      super(String.format("ScheduleGroupEdge-%d", nextScheduleGroupEdgeId++), src, dst);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      final ScheduleGroupEdge that = (ScheduleGroupEdge) o;
+      return this.getSrc().equals(that.getSrc()) && this.getDst().equals(that.getDst());
+    }
+
+    @Override
+    public int hashCode() {
+      return getSrc().hashCode() + 31 * getDst().hashCode();
+    }
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
deleted file mode 100644
index 7638e5d..0000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Default method of partitioning an IR DAG into stages.
- * We traverse the DAG topologically to observe each vertex if it can be added to a stage or if it should be assigned
- * to a new stage. We filter out the candidate incoming edges to connect to an existing stage, and if it exists, we
- * connect it to the stage, and otherwise we don't.
- */
-public final class DefaultStagePartitioningPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public DefaultStagePartitioningPass() {
-    super(StageIdProperty.class, Stream.of(
-        DataCommunicationPatternProperty.class,
-        ExecutorPlacementProperty.class,
-        DataFlowModelProperty.class,
-        PartitionerProperty.class,
-        ParallelismProperty.class
-    ).collect(Collectors.toSet()));
-  }
-
-  @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG) {
-    final AtomicInteger stageNum = new AtomicInteger(0);
-    final List<List<IRVertex>> vertexListForEachStage = groupVerticesByStage(irDAG);
-    vertexListForEachStage.forEach(stageVertices -> {
-      stageVertices.forEach(irVertex -> irVertex.setProperty(StageIdProperty.of(stageNum.get())));
-      stageNum.getAndIncrement();
-    });
-    return irDAG;
-  }
-
-  /**
-   * This method traverses the IR DAG to group each of the vertices by stages.
-   * @param irDAG to traverse.
-   * @return List of groups of vertices that are each divided by stages.
-   */
-  private List<List<IRVertex>> groupVerticesByStage(final DAG<IRVertex, IREdge> irDAG) {
-    // Data structures used for stage partitioning.
-    final HashMap<IRVertex, Integer> vertexStageNumHashMap = new HashMap<>();
-    final List<List<IRVertex>> vertexListForEachStage = new ArrayList<>();
-    final AtomicInteger stageNumber = new AtomicInteger(1);
-
-    // First, traverse the DAG topologically to add each vertices to a list associated with each of the stage number.
-    irDAG.topologicalDo(vertex -> {
-      final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(vertex);
-      final Optional<List<IREdge>> inEdgeList = (inEdges == null || inEdges.isEmpty())
-              ? Optional.empty() : Optional.of(inEdges);
-
-      if (!inEdgeList.isPresent()) { // If Source vertex
-        createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage);
-      } else {
-        // Filter candidate incoming edges that can be included in a stage with the vertex.
-        final Optional<List<IREdge>> inEdgesForStage = inEdgeList.map(e -> e.stream()
-            // One to one edges
-            .filter(edge -> edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
-                              .equals(DataCommunicationPatternProperty.Value.OneToOne))
-            // MemoryStore placement
-            .filter(edge -> edge.getPropertyValue(DataStoreProperty.class).get()
-                              .equals(DataStoreProperty.Value.MemoryStore))
-            // if src and dst are placed on same container types
-            .filter(edge -> edge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get()
-                .equals(edge.getDst().getPropertyValue(ExecutorPlacementProperty.class).get()))
-            // if src and dst have same parallelism
-            .filter(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get()
-                .equals(edge.getDst().getPropertyValue(ParallelismProperty.class).get()))
-            // Src that is already included in a stage
-            .filter(edge -> vertexStageNumHashMap.containsKey(edge.getSrc()))
-            .collect(Collectors.toList()));
-        // Choose one to connect out of the candidates. We want to connect the vertex to a single stage.
-        final Optional<IREdge> edgeToConnect = inEdgesForStage.map(edges -> edges.stream().findAny())
-            .orElse(Optional.empty());
-
-        if (!inEdgesForStage.isPresent() || inEdgesForStage.get().isEmpty() || !edgeToConnect.isPresent()) {
-          // when we cannot connect vertex in other stages
-          createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage);
-        } else {
-          // otherwise connect with a stage.
-          final IRVertex irVertexToConnect = edgeToConnect.get().getSrc();
-          vertexStageNumHashMap.put(vertex, vertexStageNumHashMap.get(irVertexToConnect));
-          final Optional<List<IRVertex>> listOfIRVerticesOfTheStage =
-              vertexListForEachStage.stream().filter(l -> l.contains(irVertexToConnect)).findFirst();
-          listOfIRVerticesOfTheStage.ifPresent(lst -> {
-            vertexListForEachStage.remove(lst);
-            lst.add(vertex);
-            vertexListForEachStage.add(lst);
-          });
-        }
-      }
-    });
-    return vertexListForEachStage;
-  }
-
-  /**
-   * Creates a new stage.
-   * @param irVertex the vertex which begins the stage.
-   * @param vertexStageNumHashMap to keep track of vertex and its stage number.
-   * @param stageNumber to atomically number stages.
-   * @param vertexListForEachStage to group each vertex lists for each stages.
-   */
-  private static void createNewStage(final IRVertex irVertex, final HashMap<IRVertex, Integer> vertexStageNumHashMap,
-                                     final AtomicInteger stageNumber,
-                                     final List<List<IRVertex>> vertexListForEachStage) {
-    vertexStageNumHashMap.put(irVertex, stageNumber.getAndIncrement());
-    final List<IRVertex> newList = new ArrayList<>();
-    newList.add(irVertex);
-    vertexListForEachStage.add(newList);
-  }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index 719de83..76983fd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -16,9 +16,9 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 import java.util.Collections;
 import java.util.List;
@@ -32,7 +32,7 @@ public final class DisaggregationEdgeDataStorePass extends AnnotatingPass {
    * Default constructor.
    */
   public DisaggregationEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(DataStoreProperty.class));
+    super(InterTaskDataStoreProperty.class, Collections.singleton(InterTaskDataStoreProperty.class));
   }
 
   @Override
@@ -40,10 +40,7 @@ public final class DisaggregationEdgeDataStorePass extends AnnotatingPass {
     dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with GlusterFileStore.
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       inEdges.forEach(edge -> {
-        if (DataStoreProperty.Value.LocalFileStore
-              .equals(edge.getPropertyValue(DataStoreProperty.class).get())) {
-          edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
-        }
+        edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.GlusterFileStore));
       });
     });
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
index 0777c1c..8e4023c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
@@ -17,9 +17,9 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 
 import java.util.Collections;
@@ -33,7 +33,7 @@ public final class PadoEdgeDataStorePass extends AnnotatingPass {
    * Default constructor.
    */
   public PadoEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
+    super(InterTaskDataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
   }
 
   @Override
@@ -43,12 +43,12 @@ public final class PadoEdgeDataStorePass extends AnnotatingPass {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
           } else if (DataCommunicationPatternProperty.Value.OneToOne
               .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
           } else {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
index 6fae5ef..fe5d996 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
@@ -18,8 +18,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 import java.util.Collections;
 
@@ -32,7 +32,7 @@ public final class SailfishEdgeDataStorePass extends AnnotatingPass {
    * Default constructor.
    */
   public SailfishEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
+    super(InterTaskDataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -46,12 +46,13 @@ public final class SailfishEdgeDataStorePass extends AnnotatingPass {
           if (DataCommunicationPatternProperty.Value.Shuffle
           .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             // Pass data through memory to the merger vertex.
-            edgeToMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
+            edgeToMerger.setProperty(InterTaskDataStoreProperty
+                .of(InterTaskDataStoreProperty.Value.SerializedMemoryStore));
           }
         });
         dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
             // Merge the input data and write it immediately to the remote disk.
-            edgeFromMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
+            edgeFromMerger.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore)));
       }
     });
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
deleted file mode 100644
index e76f4ba..0000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import com.google.common.collect.Lists;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
-
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * A pass for assigning each stages in schedule groups.
- * We traverse the DAG topologically to find the dependency information between stages and number them appropriately
- * to give correct order or schedule groups.
- */
-public final class ScheduleGroupPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public ScheduleGroupPass() {
-    super(ScheduleGroupIndexProperty.class, Stream.of(
-        StageIdProperty.class,
-        DataCommunicationPatternProperty.class,
-        ExecutorPlacementProperty.class,
-        DataFlowModelProperty.class,
-        PartitionerProperty.class,
-        ParallelismProperty.class
-    ).collect(Collectors.toSet()));
-  }
-
-  private static final int INITIAL_SCHEDULE_GROUP = 0;
-
-  @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    // We assume that the input dag is tagged with stage ids.
-    if (dag.getVertices().stream()
-        .anyMatch(irVertex -> !irVertex.getPropertyValue(StageIdProperty.class).isPresent())) {
-      throw new RuntimeException("There exists an IR vertex going through ScheduleGroupPass "
-          + "without stage id tagged.");
-    }
-
-    // Map of stage id to the stage ids that it depends on.
-    final Map<Integer, Set<Integer>> dependentStagesMap = new HashMap<>();
-    dag.topologicalDo(irVertex -> {
-      final Integer currentStageId = irVertex.getPropertyValue(StageIdProperty.class).get();
-      dependentStagesMap.putIfAbsent(currentStageId, new HashSet<>());
-      // while traversing, we find the stages that point to the current stage and add them to the list.
-      dag.getIncomingEdgesOf(irVertex).stream()
-          .map(IREdge::getSrc)
-          .mapToInt(vertex -> vertex.getPropertyValue(StageIdProperty.class).get())
-          .filter(n -> n != currentStageId)
-          .forEach(n -> dependentStagesMap.get(currentStageId).add(n));
-    });
-
-    // Map to put our results in.
-    final Map<Integer, Integer> stageIdToScheduleGroupIndexMap = new HashMap<>();
-
-    // Calculate schedule group number of each stages step by step
-    while (stageIdToScheduleGroupIndexMap.size() < dependentStagesMap.size()) {
-      // This is to ensure that each iteration is making progress.
-      // We ensure that the stageIdToScheduleGroupIdMap is increasing in size in each iteration.
-      final Integer previousSize = stageIdToScheduleGroupIndexMap.size();
-      dependentStagesMap.forEach((stageId, dependentStages) -> {
-        if (!stageIdToScheduleGroupIndexMap.keySet().contains(stageId)
-            && dependentStages.isEmpty()) { // initial source stages
-          // initial source stages are indexed with schedule group 0.
-          stageIdToScheduleGroupIndexMap.put(stageId, INITIAL_SCHEDULE_GROUP);
-        } else if (!stageIdToScheduleGroupIndexMap.keySet().contains(stageId)
-            && dependentStages.stream().allMatch(stageIdToScheduleGroupIndexMap::containsKey)) { // next stages
-          // We find the maximum schedule group index from previous stages, and index current stage with that number +1.
-          final Integer maxDependentSchedulerGroupIndex =
-              dependentStages.stream()
-                  .mapToInt(stageIdToScheduleGroupIndexMap::get)
-                  .max().orElseThrow(() ->
-                    new RuntimeException("A stage that is not a source stage much have dependent stages"));
-          stageIdToScheduleGroupIndexMap.put(stageId, maxDependentSchedulerGroupIndex + 1);
-        }
-      });
-      if (previousSize == stageIdToScheduleGroupIndexMap.size()) {
-        throw new RuntimeException("Iteration for indexing schedule groups in "
-            + ScheduleGroupPass.class.getSimpleName() + " is not making progress");
-      }
-    }
-
-    // Reverse topologically traverse and match schedule group ids for those that have push edges in between
-    Lists.reverse(dag.getTopologicalSort()).forEach(v -> {
-      // get the destination vertices of the edges that are marked as push
-      final List<IRVertex> pushConnectedVertices = dag.getOutgoingEdgesOf(v).stream()
-          .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getPropertyValue(DataFlowModelProperty.class).get()))
-          .map(IREdge::getDst)
-          .collect(Collectors.toList());
-      if (!pushConnectedVertices.isEmpty()) { // if we need to do something,
-        // we find the min value of the destination schedule groups.
-        final Integer newSchedulerGroupIndex = pushConnectedVertices.stream()
-            .mapToInt(irVertex -> stageIdToScheduleGroupIndexMap
-                .get(irVertex.getPropertyValue(StageIdProperty.class).get()))
-            .min().orElseThrow(() -> new RuntimeException("a list was not empty, but produced an empty result"));
-        // overwrite
-        final Integer originalScheduleGroupIndex = stageIdToScheduleGroupIndexMap
-            .get(v.getPropertyValue(StageIdProperty.class).get());
-        stageIdToScheduleGroupIndexMap.replace(v.getPropertyValue(StageIdProperty.class).get(), newSchedulerGroupIndex);
-        // shift those if it came too far
-        if (stageIdToScheduleGroupIndexMap.values().stream()
-            .noneMatch(stageIndex -> stageIndex.equals(originalScheduleGroupIndex))) { // if it doesn't exist
-          stageIdToScheduleGroupIndexMap.replaceAll((stageId, scheduleGroupIndex) -> {
-            if (scheduleGroupIndex > originalScheduleGroupIndex) {
-              return scheduleGroupIndex - 1; // we shift schedule group indexes by one.
-            } else {
-              return scheduleGroupIndex;
-            }
-          });
-        }
-      }
-    });
-
-    // do the tagging
-    dag.topologicalDo(irVertex -> irVertex.setProperty(ScheduleGroupIndexProperty.of(
-        stageIdToScheduleGroupIndexMap.get(irVertex.getPropertyValue(StageIdProperty.class).get()))));
-
-    return dag;
-  }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index b0ab2b1..3a5c80c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -30,13 +30,12 @@ public final class PrimitiveCompositePass extends CompositePass {
    */
   public PrimitiveCompositePass() {
     super(Arrays.asList(
-        new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
+        new DefaultParallelismPass(),
         new DefaultEdgeEncoderPass(),
         new DefaultEdgeDecoderPass(),
-        new DefaultStagePartitioningPass(),
-        new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
+        new DefaultInterTaskDataStorePass(),
         new DefaultEdgeUsedDataHandlingPass(),
-        new ScheduleGroupPass(),
+        new DefaultScheduleGroupPass(),
         new CompressionPass(),
         new DecompressionPass()
     ));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 9bf434a..1abe092 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -23,6 +23,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternPro
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SkipSerDesProperty;
 import edu.snu.nemo.common.ir.vertex.transform.RelayTransform;
 
 import java.util.Collections;
@@ -58,6 +59,7 @@ public final class SailfishRelayReshapingPass extends ReshapingPass {
             // Insert a merger vertex having transform that write received data immediately
             // before the vertex receiving shuffled data.
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
+            iFileMergerVertex.getExecutionProperties().put(SkipSerDesProperty.of());
             builder.addVertex(iFileMergerVertex);
             final IREdge newEdgeToMerger = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
                 edge.getSrc(), iFileMergerVertex, edge.isSideInput());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 124aa16..7fc4fba 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -16,8 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
 
 import java.util.ArrayList;
@@ -30,8 +29,7 @@ public final class BasicPullPolicy implements Policy {
   @Override
   public List<CompileTimePass> getCompileTimePasses() {
     List<CompileTimePass> policy = new ArrayList<>();
-    policy.add(new DefaultStagePartitioningPass());
-    policy.add(new ScheduleGroupPass());
+    policy.add(new DefaultScheduleGroupPass());
     return policy;
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 595079e..53ff914 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -16,8 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
 
@@ -31,9 +30,8 @@ public final class BasicPushPolicy implements Policy {
   @Override
   public List<CompileTimePass> getCompileTimePasses() {
     List<CompileTimePass> policy = new ArrayList<>();
-    policy.add(new DefaultStagePartitioningPass());
     policy.add(new ShuffleEdgePushPass());
-    policy.add(new ScheduleGroupPass());
+    policy.add(new DefaultScheduleGroupPass());
     return policy;
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index 62193a6..d23d5bd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -16,9 +16,8 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ReviseInterStageEdgeDataStorePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultInterTaskDataStorePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
@@ -63,9 +62,8 @@ public final class DefaultPolicyWithSeparatePass implements Policy {
      */
     RefactoredPass() {
       super(Arrays.asList(
-          new DefaultStagePartitioningPass(),
-          new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
-          new ScheduleGroupPass()
+          new DefaultInterTaskDataStorePass(),
+          new DefaultScheduleGroupPass()
       ));
     }
   }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index 7b40556..5268c00 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.compiler.optimizer.policy;
 import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
@@ -67,7 +67,7 @@ public final class PolicyBuilder {
     annotatedExecutionProperties.add(ExecutorPlacementProperty.class);
     annotatedExecutionProperties.add(ParallelismProperty.class);
     annotatedExecutionProperties.add(DataFlowModelProperty.class);
-    annotatedExecutionProperties.add(DataStoreProperty.class);
+    annotatedExecutionProperties.add(InterTaskDataStoreProperty.class);
     annotatedExecutionProperties.add(PartitionerProperty.class);
   }
 
diff --git a/examples/resources/beam_sample_executor_resources.json b/examples/resources/beam_sample_executor_resources.json
index ced110c..0b40af9 100644
--- a/examples/resources/beam_sample_executor_resources.json
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -2,11 +2,11 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 5
+    "capacity": 15
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 5
+    "capacity": 15
   }
 ]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 8693918..d8999b4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -89,7 +89,7 @@ public final class DataSkewRuntimePass implements RuntimePass<Pair<List<String>,
       optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
     });
 
-    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getIdToIRVertex());
+    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index fdd7edf..30f7111 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -34,14 +35,18 @@ public final class PhysicalPlan implements Serializable {
    *
    * @param id              ID of the plan.
    * @param stageDAG        the DAG of stages.
-   * @param idToIRVertex map from task to IR vertex.
    */
   public PhysicalPlan(final String id,
-                      final DAG<Stage, StageEdge> stageDAG,
-                      final Map<String, IRVertex> idToIRVertex) {
+                      final DAG<Stage, StageEdge> stageDAG) {
     this.id = id;
     this.stageDAG = stageDAG;
-    this.idToIRVertex = idToIRVertex;
+
+    idToIRVertex = new HashMap<>();
+    for (final Stage stage : stageDAG.getVertices()) {
+      for (final IRVertex irVertex : stage.getIRDAG().getVertices()) {
+        idToIRVertex.put(irVertex.getId(), irVertex);
+      }
+    }
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index a11997e..d679387 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -18,17 +18,19 @@ package edu.snu.nemo.runtime.common.plan;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.*;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
@@ -39,18 +41,21 @@ import java.util.function.Function;
  * A function that converts an IR DAG to physical DAG.
  */
 public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
-  private final Map<String, IRVertex> idToIRVertex;
   private final String dagDirectory;
+  private final StagePartitioner stagePartitioner;
 
   /**
    * Private constructor.
    *
+   * @param stagePartitioner provides stage partitioning
    * @param dagDirectory the directory in which to store DAG data.
    */
   @Inject
-  private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
-    this.idToIRVertex = new HashMap<>();
+  private PhysicalPlanGenerator(final StagePartitioner stagePartitioner,
+                                @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
     this.dagDirectory = dagDirectory;
+    this.stagePartitioner = stagePartitioner;
+    stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
   }
 
   /**
@@ -64,6 +69,9 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
     // first, stage-partition the IR DAG.
     final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG);
 
+    // Sanity check
+    dagOfStages.getVertices().forEach(this::integrityCheck);
+
     // this is needed because of DuplicateEdgeGroupProperty.
     handleDuplicateEdgeGroupProperty(dagOfStages);
 
@@ -73,10 +81,6 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
     return dagOfStages;
   }
 
-  public Map<String, IRVertex> getIdToIRVertex() {
-    return idToIRVertex;
-  }
-
   /**
    * Convert the edge id of DuplicateEdgeGroupProperty to physical edge id.
    *
@@ -115,41 +119,36 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
    */
   public DAG<Stage, StageEdge> stagePartitionIrDAG(final DAG<IRVertex, IREdge> irDAG) {
     final DAGBuilder<Stage, StageEdge> dagOfStagesBuilder = new DAGBuilder<>();
+    final Set<IREdge> interStageEdges = new HashSet<>();
+    final Map<Integer, Stage> stageIdToStageMap = new HashMap<>();
+    final Map<IRVertex, Integer> vertexToStageIdMap = stagePartitioner.apply(irDAG);
 
-    final Map<Integer, List<IRVertex>> vertexListForEachStage = new LinkedHashMap<>();
+    final Map<Integer, Set<IRVertex>> vertexSetForEachStage = new LinkedHashMap<>();
     irDAG.topologicalDo(irVertex -> {
-      final Integer stageNum = irVertex.getPropertyValue(StageIdProperty.class).get();
-      if (!vertexListForEachStage.containsKey(stageNum)) {
-        vertexListForEachStage.put(stageNum, new ArrayList<>());
+      final int stageId = vertexToStageIdMap.get(irVertex);
+      if (!vertexSetForEachStage.containsKey(stageId)) {
+        vertexSetForEachStage.put(stageId, new HashSet<>());
       }
-      vertexListForEachStage.get(stageNum).add(irVertex);
+      vertexSetForEachStage.get(stageId).add(irVertex);
     });
 
-    final Map<IRVertex, Stage> vertexStageMap = new HashMap<>();
-
-    for (final List<IRVertex> stageVertices : vertexListForEachStage.values()) {
-      integrityCheck(stageVertices);
-
-      final Set<IRVertex> currentStageVertices = new HashSet<>();
-      final Set<StageEdgeBuilder> currentStageIncomingEdges = new HashSet<>();
+    for (final int stageId : vertexSetForEachStage.keySet()) {
+      final Set<IRVertex> stageVertices = vertexSetForEachStage.get(stageId);
+      final String stageIdentifier = RuntimeIdGenerator.generateStageId(stageId);
+      final ExecutionPropertyMap<VertexExecutionProperty> stageProperties = new ExecutionPropertyMap<>(stageIdentifier);
+      stagePartitioner.getStageProperties(stageVertices.iterator().next()).forEach(stageProperties::put);
+      final int stageParallelism = stageProperties.get(ParallelismProperty.class)
+          .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
 
-      // Create a new stage builder.
-      final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
-          .orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
-      final StageBuilder stageBuilder = new StageBuilder(
-          irVertexOfNewStage.getPropertyValue(StageIdProperty.class).get(),
-          irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get(),
-          irVertexOfNewStage.getPropertyValue(ScheduleGroupIndexProperty.class).get(),
-          irVertexOfNewStage.getPropertyValue(ExecutorPlacementProperty.class).get());
+      final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
 
-      // Prepare useful variables.
-      final int stageParallelism = irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get();
+      // Prepare vertexIdtoReadables
       final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
         vertexIdToReadables.add(new HashMap<>());
       }
 
-      // For each vertex in the stage,
+      // For each IRVertex,
       for (final IRVertex irVertex : stageVertices) {
         // Take care of the readables of a source vertex.
         if (irVertex instanceof SourceVertex) {
@@ -159,108 +158,78 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
             for (int i = 0; i < stageParallelism; i++) {
               vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
             }
-          } catch (Exception e) {
+          } catch (final Exception e) {
             throw new PhysicalPlanGenerationException(e);
           }
-
           // Clear internal metadata.
           sourceVertex.clearInternalStates();
         }
 
         // Add vertex to the stage.
-        stageBuilder.addVertex(irVertex);
-        currentStageVertices.add(irVertex);
+        stageInternalDAGBuilder.addVertex(irVertex);
+      }
 
+      for (final IRVertex dstVertex : stageVertices) {
         // Connect all the incoming edges for the vertex.
-        final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(irVertex);
-        final Optional<List<IREdge>> inEdgeList = (inEdges == null) ? Optional.empty() : Optional.of(inEdges);
-        inEdgeList.ifPresent(edges -> edges.forEach(irEdge -> {
+        irDAG.getIncomingEdgesOf(dstVertex).forEach(irEdge -> {
           final IRVertex srcVertex = irEdge.getSrc();
-          final IRVertex dstVertex = irEdge.getDst();
 
-          if (srcVertex == null) {
-            throw new IllegalVertexOperationException("Unable to locate srcVertex for IREdge " + irEdge);
-          } else if (dstVertex == null) {
-            throw new IllegalVertexOperationException("Unable to locate dstVertex for IREdge " + irEdge);
-          }
-
-          // both vertices are in the stage.
-          if (currentStageVertices.contains(srcVertex) && currentStageVertices.contains(dstVertex)) {
-            stageBuilder.connectInternalVertices(new RuntimeEdge<IRVertex>(
+          // both vertices are in the same stage.
+          if (vertexToStageIdMap.get(srcVertex).equals(vertexToStageIdMap.get(dstVertex))) {
+            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
                 irEdge.getId(),
                 irEdge.getExecutionProperties(),
                 irEdge.getSrc(),
                 irEdge.getDst(),
                 irEdge.isSideInput()));
           } else { // edge comes from another stage
-            final Stage srcStage = vertexStageMap.get(srcVertex);
-
-            if (srcStage == null) {
-              throw new IllegalVertexOperationException("srcVertex " + srcVertex.getId()
-                  + " not yet added to the builder");
-            }
-
-            final StageEdgeBuilder newEdgeBuilder = new StageEdgeBuilder(irEdge.getId())
-                .setEdgeProperties(irEdge.getExecutionProperties())
-                .setSrcVertex(srcVertex)
-                .setDstVertex(dstVertex)
-                .setSrcStage(srcStage)
-                .setSideInputFlag(irEdge.isSideInput());
-            currentStageIncomingEdges.add(newEdgeBuilder);
+            interStageEdges.add(irEdge);
           }
-        }));
-
-        // Track id to irVertex.
-        idToIRVertex.put(irVertex.getId(), irVertex);
+        });
       }
-      stageBuilder.addReadables(vertexIdToReadables);
       // If this runtime stage contains at least one vertex, build it!
-      if (!stageBuilder.isEmpty()) {
-        final Stage currentStage = stageBuilder.build();
-        dagOfStagesBuilder.addVertex(currentStage);
-
-        // Add this stage as the destination stage for all the incoming edges.
-        currentStageIncomingEdges.forEach(stageEdgeBuilder -> {
-          stageEdgeBuilder.setDstStage(currentStage);
-          final StageEdge stageEdge = stageEdgeBuilder.build();
-          dagOfStagesBuilder.connectVertices(stageEdge);
-        });
-        currentStageIncomingEdges.clear();
+      if (!stageInternalDAGBuilder.isEmpty()) {
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAG
+            = stageInternalDAGBuilder.buildWithoutSourceSinkCheck();
+        final Stage stage = new Stage(stageIdentifier, stageInternalDAG, stageProperties, vertexIdToReadables);
+        dagOfStagesBuilder.addVertex(stage);
+        stageIdToStageMap.put(stageId, stage);
+      }
+    }
 
-        currentStageVertices.forEach(irVertex -> vertexStageMap.put(irVertex, currentStage));
-        currentStageVertices.clear();
+    // Add StageEdges
+    for (final IREdge interStageEdge : interStageEdges) {
+      final Stage srcStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getSrc()));
+      final Stage dstStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getDst()));
+      if (srcStage == null || dstStage == null) {
+        throw new IllegalVertexOperationException(String.format("Stage not added to the builder:%s%s",
+            srcStage == null ? String.format(" source stage for %s", interStageEdge.getSrc()) : "",
+            dstStage == null ? String.format(" destination stage for %s", interStageEdge.getDst()) : ""));
       }
+      dagOfStagesBuilder.connectVertices(new StageEdge(interStageEdge.getId(), interStageEdge.getExecutionProperties(),
+          interStageEdge.getSrc(), interStageEdge.getDst(), srcStage, dstStage, interStageEdge.isSideInput()));
     }
 
     return dagOfStagesBuilder.build();
   }
 
   /**
-   * Integrity check for a stage's vertices.
-   * @param stageVertices to check for
+   * Integrity check for Stage.
+   * @param stage to check for
    */
-  private void integrityCheck(final List<IRVertex> stageVertices) {
-    final IRVertex firstVertex = stageVertices.get(0);
-    final String placement = firstVertex.getPropertyValue(ExecutorPlacementProperty.class).get();
-    final int scheduleGroup = firstVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
-    final int parallelism = firstVertex.getPropertyValue(ParallelismProperty.class).get();
+  private void integrityCheck(final Stage stage) {
+    stage.getPropertyValue(ParallelismProperty.class)
+        .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
+    stage.getPropertyValue(ScheduleGroupIndexProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
 
-    stageVertices.forEach(irVertex -> {
+    stage.getIRDAG().getVertices().forEach(irVertex -> {
       // Check vertex type.
       if (!(irVertex instanceof  SourceVertex
           || irVertex instanceof OperatorVertex
           || irVertex instanceof MetricCollectionBarrierVertex)) {
         throw new UnsupportedOperationException(irVertex.toString());
       }
-
-      // Check execution properties.
-      if ((placement != null
-          && !placement.equals(irVertex.getPropertyValue(ExecutorPlacementProperty.class).get()))
-          || scheduleGroup != irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get()
-          || parallelism != irVertex.getPropertyValue(ParallelismProperty.class).get()) {
-        throw new RuntimeException("Vertices of the same stage have different execution properties: "
-            + irVertex.getId());
-      }
     });
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index 1280f5b..ee0a337 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -18,48 +18,52 @@ package edu.snu.nemo.runtime.common.plan;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.commons.lang3.SerializationUtils;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Stage.
  */
 public final class Stage extends Vertex {
   private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
-  private final int parallelism;
-  private final int scheduleGroupIndex;
-  private final String containerType;
   private final byte[] serializedIRDag;
+  private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
   private final List<Map<String, Readable>> vertexIdToReadables;
+  private final int parallelism;
+  private final int scheduleGroupIndex;
 
   /**
    * Constructor.
    *
    * @param stageId             ID of the stage.
    * @param irDag               the DAG of the task in this stage.
-   * @param parallelism         how many tasks will be executed in this stage.
-   * @param scheduleGroupIndex  the schedule group index.
-   * @param containerType       the type of container to execute the task on.
+   * @param executionProperties set of {@link VertexExecutionProperty} for this stage
    * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
    */
   public Stage(final String stageId,
                final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
-               final int parallelism,
-               final int scheduleGroupIndex,
-               final String containerType,
+               final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
                final List<Map<String, Readable>> vertexIdToReadables) {
     super(stageId);
     this.irDag = irDag;
-    this.parallelism = parallelism;
-    this.scheduleGroupIndex = scheduleGroupIndex;
-    this.containerType = containerType;
     this.serializedIRDag = SerializationUtils.serialize(irDag);
+    this.executionProperties = executionProperties;
     this.vertexIdToReadables = vertexIdToReadables;
+    this.parallelism = executionProperties.get(ParallelismProperty.class)
+        .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
+    this.scheduleGroupIndex = executionProperties.get(ScheduleGroupIndexProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
   }
 
   /**
@@ -95,10 +99,22 @@ public final class Stage extends Vertex {
   }
 
   /**
-   * @return the type of container to execute the task on.
+   * @return {@link VertexExecutionProperty} map for this stage
+   */
+  public ExecutionPropertyMap<VertexExecutionProperty> getExecutionProperties() {
+    return executionProperties;
+  }
+
+  /**
+   * Get the executionProperty of the IREdge.
+   *
+   * @param <T>                  Type of the return value.
+   * @param executionPropertyKey key of the execution property.
+   * @return the execution property.
    */
-  public String getContainerType() {
-    return containerType;
+  public <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
+    return executionProperties.get(executionPropertyKey);
   }
 
   /**
@@ -114,7 +130,7 @@ public final class Stage extends Vertex {
     sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
     sb.append(", \"irDag\": ").append(irDag);
     sb.append(", \"parallelism\": ").append(parallelism);
-    sb.append(", \"containerType\": \"").append(containerType).append("\"");
+    sb.append(", \"executionProperties\": ").append(executionProperties);
     sb.append('}');
     return sb.toString();
   }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
deleted file mode 100644
index 04475d2..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.common.dag.DAGBuilder;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Stage Builder.
- */
-final class StageBuilder {
-  private final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder;
-  private final Integer stageId;
-  private final int parallelism;
-  private final int scheduleGroupIndex;
-  private final String containerType;
-  private List<Map<String, Readable>> vertexIdToReadables;
-
-  /**
-   * Builds a {@link Stage}.
-   * @param stageId stage ID in numeric form.
-   * @param scheduleGroupIndex indicating its scheduling order.
-   */
-  StageBuilder(final Integer stageId,
-               final int parallelism,
-               final int scheduleGroupIndex,
-               final String containerType) {
-    this.stageId = stageId;
-    this.parallelism = parallelism;
-    this.scheduleGroupIndex = scheduleGroupIndex;
-    this.containerType = containerType;
-    this.stageInternalDAGBuilder = new DAGBuilder<>();
-    this.vertexIdToReadables = new ArrayList<>(1);
-  }
-
-  /**
-   * Adds a {@link IRVertex} to this stage.
-   * @param vertex to add.
-   * @return the stageBuilder.
-   */
-  StageBuilder addVertex(final IRVertex vertex) {
-    stageInternalDAGBuilder.addVertex(vertex);
-    return this; }
-
-  /**
-   * Connects two {@link IRVertex} in this stage.
-   * @param edge the IREdge that connects vertices.
-   * @return the stageBuilder.
-   */
-  StageBuilder connectInternalVertices(final RuntimeEdge<IRVertex> edge) {
-    stageInternalDAGBuilder.connectVertices(edge);
-    return this;
-  }
-
-  StageBuilder addReadables(final List<Map<String, Readable>> vertexIdToReadable) {
-    this.vertexIdToReadables = vertexIdToReadable;
-    return this;
-  }
-
-  /**
-   * @return true if this builder doesn't contain any valid {@link IRVertex}.
-   */
-  boolean isEmpty() {
-    return stageInternalDAGBuilder.isEmpty();
-  }
-
-  /**
-   * Builds and returns the {@link Stage}.
-   * @return the runtime stage.
-   */
-  Stage build() {
-    final Stage stage = new Stage(
-        RuntimeIdGenerator.generateStageId(stageId),
-        stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
-        parallelism,
-        scheduleGroupIndex,
-        containerType,
-        vertexIdToReadables);
-    return stage;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 608da4d..d2697f4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -56,7 +57,8 @@ public final class StageEdge extends RuntimeEdge<Stage> {
    * @param dstStage       destination stage.
    * @param isSideInput    whether or not the edge is a sideInput edge.
    */
-  StageEdge(final String runtimeEdgeId,
+  @VisibleForTesting
+  public StageEdge(final String runtimeEdgeId,
             final ExecutionPropertyMap edgeProperties,
             final IRVertex srcVertex,
             final IRVertex dstVertex,
@@ -91,7 +93,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"runtimeEdgeId\": \"").append(getId());
-    sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
+    sb.append("\", \"executionProperties\": ").append(getExecutionProperties());
     sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
     sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
     sb.append("\"}");
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
deleted file mode 100644
index f94feca..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-
-/**
- * Stage Edge Builder.
- */
-public final class StageEdgeBuilder {
-  private final String stageEdgeId;
-  private ExecutionPropertyMap edgeProperties;
-  private Stage srcStage;
-  private Stage dstStage;
-  private IRVertex srcVertex;
-  private IRVertex dstVertex;
-  private Boolean isSideInput;
-
-  /**
-   * Represents the edge between vertices in a logical plan.
-   *
-   * @param irEdgeId id of this edge.
-   */
-  public StageEdgeBuilder(final String irEdgeId) {
-    this.stageEdgeId = irEdgeId;
-  }
-
-  /**
-   * Setter for edge properties.
-   *
-   * @param ea the edge properties.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setEdgeProperties(final ExecutionPropertyMap ea) {
-    this.edgeProperties = ea;
-    return this;
-  }
-
-  /**
-   * Setter for the source stage.
-   *
-   * @param ss the source stage.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setSrcStage(final Stage ss) {
-    this.srcStage = ss;
-    return this;
-  }
-
-  /**
-   * Setter for the destination stage.
-   *
-   * @param ds the destination stage.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setDstStage(final Stage ds) {
-    this.dstStage = ds;
-    return this;
-  }
-
-  /**
-   * Setter for the source vertex.
-   *
-   * @param sv the source vertex.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setSrcVertex(final IRVertex sv) {
-    this.srcVertex = sv;
-    return this;
-  }
-
-  /**
-   * Setter for the destination vertex.
-   *
-   * @param dv the destination vertex.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setDstVertex(final IRVertex dv) {
-    this.dstVertex = dv;
-    return this;
-  }
-
-  /**
-   * Setter for side input flag.
-   *
-   * @param sideInputFlag the side input flag.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setSideInputFlag(final Boolean sideInputFlag) {
-    this.isSideInput = sideInputFlag;
-    return this;
-  }
-
-  /**
-   * @return the built stage edge.
-   */
-  public StageEdge build() {
-    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, isSideInput);
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
new file mode 100644
index 0000000..49553dc
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A function that is responsible for stage partitioning on IR DAG.
+ * Each stage becomes maximal set of {@link IRVertex} such that
+ * <ul>
+ *   <li>branches and non-OneToOne edges are not allowed within a stage, and</li>
+ *   <li>all vertices in a stage should have same {@link VertexExecutionProperty} map,
+ *   except for the ignored properties.</li>
+ * </ul>
+ */
+@DriverSide
+@ThreadSafe
+public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, Map<IRVertex, Integer>> {
+  private final Set<Class<? extends VertexExecutionProperty>> ignoredPropertyKeys = ConcurrentHashMap.newKeySet();
+
+  @Inject
+  private StagePartitioner() {
+  }
+
+  /**
+   * By default, the stage partitioner merges two vertices into one stage if and only if the two vertices have
+   * same set of {@link VertexExecutionProperty}.
+   * Invoking this method will make the stage partitioner ignore a specific property during comparing
+   * the execution property maps.
+   * @param ignoredPropertyKey a property that will be ignored during the stage partitioning.
+   */
+  public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty> ignoredPropertyKey) {
+    ignoredPropertyKeys.add(ignoredPropertyKey);
+  }
+
+  /**
+   * @param irDAG IR DAG to perform stage partition on.
+   * @return a map between IR vertex and the corresponding stage id
+   */
+  @Override
+  public Map<IRVertex, Integer> apply(final DAG<IRVertex, IREdge> irDAG) {
+    final MutableInt nextStageIndex = new MutableInt(0);
+    final Map<IRVertex, Integer> vertexToStageIdMap = new HashMap<>();
+    irDAG.topologicalDo(irVertex -> {
+      // Base case: for root vertices
+      if (vertexToStageIdMap.get(irVertex) == null) {
+        vertexToStageIdMap.put(irVertex, nextStageIndex.getValue());
+        nextStageIndex.increment();
+      }
+      // Get stage id of irVertex
+      final int stageId = vertexToStageIdMap.get(irVertex);
+      // Step case: inductively assign stage ids based on mergability with irVertex
+      for (final IREdge edge : irDAG.getOutgoingEdgesOf(irVertex)) {
+        final IRVertex connectedIRVertex = edge.getDst();
+        // Skip if it already has been assigned stageId
+        if (vertexToStageIdMap.containsKey(connectedIRVertex)) {
+          continue;
+        }
+        // Assign stageId
+        if (testMergability(edge, irDAG)) {
+          vertexToStageIdMap.put(connectedIRVertex, stageId);
+        } else {
+          vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue());
+          nextStageIndex.increment();
+        }
+      }
+    });
+    return vertexToStageIdMap;
+  }
+
+  /**
+   * @param edge an {@link IREdge}.
+   * @param dag IR DAG which contains {@code edge}
+   * @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage.
+   */
+  private boolean testMergability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
+    // If the destination vertex has multiple inEdges, return false
+    if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) {
+      return false;
+    }
+    // If the edge is not OneToOne, return false
+    if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
+        != DataCommunicationPatternProperty.Value.OneToOne) {
+      return false;
+    }
+    // Return true if and only if the execution properties of the two vertices are compatible
+    return getStageProperties(edge.getSrc()).equals(getStageProperties(edge.getDst()));
+  }
+
+  /**
+   * @param vertex a vertex in a stage
+   * @return set of stage-level properties for the stage
+   */
+  public Set<VertexExecutionProperty> getStageProperties(final IRVertex vertex) {
+    final Stream<VertexExecutionProperty> stream = vertex.getExecutionProperties().stream();
+    return stream.filter(p -> !ignoredPropertyKeys.contains(p.getClass())).collect(Collectors.toSet());
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 7f2a203..21ea23a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -16,10 +16,13 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A Task is a self-contained executable that can be executed on a machine.
@@ -30,7 +33,7 @@ public final class Task implements Serializable {
   private final List<StageEdge> taskIncomingEdges;
   private final List<StageEdge> taskOutgoingEdges;
   private final int attemptIdx;
-  private final String containerType;
+  private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
   private final byte[] serializedIRDag;
   private final Map<String, Readable> irVertexIdToReadable;
 
@@ -40,7 +43,7 @@ public final class Task implements Serializable {
    * @param jobId                the id of the job.
    * @param taskId               the ID of the task.
    * @param attemptIdx           the attempt index.
-   * @param containerType        the type of container to execute the task on.
+   * @param executionProperties  {@link VertexExecutionProperty} map for the corresponding stage
    * @param serializedIRDag      the serialized DAG of the task.
    * @param taskIncomingEdges    the incoming edges of the task.
    * @param taskOutgoingEdges    the outgoing edges of the task.
@@ -49,7 +52,7 @@ public final class Task implements Serializable {
   public Task(final String jobId,
               final String taskId,
               final int attemptIdx,
-              final String containerType,
+              final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
               final byte[] serializedIRDag,
               final List<StageEdge> taskIncomingEdges,
               final List<StageEdge> taskOutgoingEdges,
@@ -57,7 +60,7 @@ public final class Task implements Serializable {
     this.jobId = jobId;
     this.taskId = taskId;
     this.attemptIdx = attemptIdx;
-    this.containerType = containerType;
+    this.executionProperties = executionProperties;
     this.serializedIRDag = serializedIRDag;
     this.taskIncomingEdges = taskIncomingEdges;
     this.taskOutgoingEdges = taskOutgoingEdges;
@@ -107,10 +110,22 @@ public final class Task implements Serializable {
   }
 
   /**
-   * @return the type of container to execute the task on.
+   * @return {@link VertexExecutionProperty} map for the corresponding stage
    */
-  public String getContainerType() {
-    return containerType;
+  public ExecutionPropertyMap<VertexExecutionProperty> getExecutionProperties() {
+    return executionProperties;
+  }
+
+  /**
+   * Get the executionProperty of this task.
+   *
+   * @param <T>                  Type of the return value.
+   * @param executionPropertyKey key of the execution property.
+   * @return the execution property.
+   */
+  public <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
+    return executionProperties.get(executionPropertyKey);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 86a18cb..52c55fd 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.common.exception.UnsupportedBlockStoreException;
 import edu.snu.nemo.common.exception.UnsupportedExecutionPropertyException;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -122,7 +122,7 @@ public final class BlockManagerWorker {
    * @throws BlockWriteException for any error occurred while trying to create a block.
    */
   public Block createBlock(final String blockId,
-                           final DataStoreProperty.Value blockStore) throws BlockWriteException {
+                           final InterTaskDataStoreProperty.Value blockStore) throws BlockWriteException {
     final BlockStore store = getBlockStore(blockStore);
     return store.createBlock(blockId);
   }
@@ -137,7 +137,7 @@ public final class BlockManagerWorker {
    */
   private CompletableFuture<DataUtil.IteratorWithNumBytes> retrieveDataFromBlock(
       final String blockId,
-      final DataStoreProperty.Value blockStore,
+      final InterTaskDataStoreProperty.Value blockStore,
       final KeyRange keyRange) {
     LOG.info("RetrieveDataFromBlock: {}", blockId);
     final BlockStore store = getBlockStore(blockStore);
@@ -188,7 +188,7 @@ public final class BlockManagerWorker {
   public CompletableFuture<DataUtil.IteratorWithNumBytes> queryBlock(
       final String blockId,
       final String runtimeEdgeId,
-      final DataStoreProperty.Value blockStore,
+      final InterTaskDataStoreProperty.Value blockStore,
       final KeyRange keyRange) {
     // Let's see if a remote worker has it
     final CompletableFuture<ControlMessage.Message> blockLocationFuture =
@@ -261,7 +261,7 @@ public final class BlockManagerWorker {
    * @param usedDataHandling     how to handle the used block.
    */
   public void writeBlock(final Block block,
-                         final DataStoreProperty.Value blockStore,
+                         final InterTaskDataStoreProperty.Value blockStore,
                          final boolean reportPartitionSizes,
                          final Map<Integer, Long> partitionSizeMap,
                          final String srcIRVertexId,
@@ -289,7 +289,7 @@ public final class BlockManagerWorker {
             .setBlockId(blockId)
             .setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
 
-    if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+    if (InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
       blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
     } else {
       blockStateChangedMsgBuilder.setLocation(executorId);
@@ -335,7 +335,7 @@ public final class BlockManagerWorker {
    * @param blockStore the store which contains the block.
    */
   public void removeBlock(final String blockId,
-                          final DataStoreProperty.Value blockStore) {
+                          final InterTaskDataStoreProperty.Value blockStore) {
     LOG.info("RemoveBlock: {}", blockId);
     final BlockStore store = getBlockStore(blockStore);
     final boolean deleted = store.deleteBlock(blockId);
@@ -347,7 +347,7 @@ public final class BlockManagerWorker {
               .setBlockId(blockId)
               .setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
 
-      if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+      if (InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
         blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
       } else {
         blockStateChangedMsgBuilder.setLocation(executorId);
@@ -371,7 +371,7 @@ public final class BlockManagerWorker {
    * @param blockStore the store which contains the block.
    * @param blockId    the ID of the block.
    */
-  private void handleUsedData(final DataStoreProperty.Value blockStore,
+  private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
                               final String blockId) {
     final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
     if (remainingExpectedRead != null) {
@@ -389,11 +389,11 @@ public final class BlockManagerWorker {
   }
 
   /**
-   * Gets the {@link BlockStore} from annotated value of {@link DataStoreProperty}.
-   * @param blockStore the annotated value of {@link DataStoreProperty}.
+   * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
+   * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
    * @return the block store.
    */
-  private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) {
+  private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
     switch (blockStore) {
       case MemoryStore:
         return memoryStore;
@@ -420,7 +420,7 @@ public final class BlockManagerWorker {
   public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
     final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.PARSER
         .parseFrom(outputContext.getContextDescriptor());
-    final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
+    final InterTaskDataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
     final String blockId = descriptor.getBlockId();
     final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
 
@@ -430,8 +430,8 @@ public final class BlockManagerWorker {
         try {
           final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
           if (optionalBlock.isPresent()) {
-            if (DataStoreProperty.Value.LocalFileStore.equals(blockStore)
-                || DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+            if (InterTaskDataStoreProperty.Value.LocalFileStore.equals(blockStore)
+                || InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
               final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange);
               for (final FileArea fileArea : fileAreas) {
                 try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
@@ -475,10 +475,10 @@ public final class BlockManagerWorker {
   /**
    * Decodes BlockStore property from protocol buffer.
    * @param blockStore property from protocol buffer
-   * @return the corresponding {@link DataStoreProperty} value
+   * @return the corresponding {@link InterTaskDataStoreProperty} value
    */
   private static ControlMessage.BlockStore convertBlockStore(
-      final DataStoreProperty.Value blockStore) {
+      final InterTaskDataStoreProperty.Value blockStore) {
     switch (blockStore) {
       case MemoryStore:
         return ControlMessage.BlockStore.MEMORY;
@@ -495,21 +495,21 @@ public final class BlockManagerWorker {
 
 
   /**
-   * Encodes {@link DataStoreProperty} value into protocol buffer property.
-   * @param blockStoreType {@link DataStoreProperty} value
+   * Encodes {@link InterTaskDataStoreProperty} value into protocol buffer property.
+   * @param blockStoreType {@link InterTaskDataStoreProperty} value
    * @return the corresponding {@link ControlMessage.BlockStore} value
    */
-  private static DataStoreProperty.Value convertBlockStore(
+  private static InterTaskDataStoreProperty.Value convertBlockStore(
       final ControlMessage.BlockStore blockStoreType) {
     switch (blockStoreType) {
       case MEMORY:
-        return DataStoreProperty.Value.MemoryStore;
+        return InterTaskDataStoreProperty.Value.MemoryStore;
       case SER_MEMORY:
-        return DataStoreProperty.Value.SerializedMemoryStore;
+        return InterTaskDataStoreProperty.Value.SerializedMemoryStore;
       case LOCAL_FILE:
-        return DataStoreProperty.Value.LocalFileStore;
+        return InterTaskDataStoreProperty.Value.LocalFileStore;
       case REMOTE_FILE:
-        return DataStoreProperty.Value.GlusterFileStore;
+        return InterTaskDataStoreProperty.Value.GlusterFileStore;
       default:
         throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported"));
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index fdaecd5..2b29b5f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.datatransfer;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -87,13 +87,15 @@ public final class InputReader extends DataTransfer {
 
   private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
     final String blockId = getBlockId(dstTaskIndex);
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+        = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
     return blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
   }
 
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
     final int numSrcTasks = this.getSourceParallelism();
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+        = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
 
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
@@ -111,7 +113,8 @@ public final class InputReader extends DataTransfer {
    */
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
     assert (runtimeEdge instanceof StageEdge);
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+        = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
     final KeyRange hashRangeToRead =
         ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index b0dd080..bb594f6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -36,7 +36,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
   private final RuntimeEdge<?> runtimeEdge;
   private final String srcVertexId;
   private final IRVertex dstIrVertex;
-  private final DataStoreProperty.Value blockStoreValue;
+  private final InterTaskDataStoreProperty.Value blockStoreValue;
   private final BlockManagerWorker blockManagerWorker;
   private final boolean nonDummyBlock;
   private final Block blockToWrite;
@@ -65,7 +65,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
     this.srcVertexId = srcRuntimeVertexId;
     this.dstIrVertex = dstIrVertex;
     this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+    this.blockStoreValue = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class).get();
 
     // Setup partitioner
     final int dstParallelism = getDstParallelism();
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 0b119d8..88dda9d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -19,9 +19,11 @@ import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.Pair;
@@ -36,7 +38,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdgeBuilder;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -91,10 +93,10 @@ import static org.mockito.Mockito.mock;
     SourceVertex.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
-  private static final DataStoreProperty.Value MEMORY_STORE = DataStoreProperty.Value.MemoryStore;
-  private static final DataStoreProperty.Value SER_MEMORY_STORE = DataStoreProperty.Value.SerializedMemoryStore;
-  private static final DataStoreProperty.Value LOCAL_FILE_STORE = DataStoreProperty.Value.LocalFileStore;
-  private static final DataStoreProperty.Value REMOTE_FILE_STORE = DataStoreProperty.Value.GlusterFileStore;
+  private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
+  private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE = InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+  private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE = InterTaskDataStoreProperty.Value.LocalFileStore;
+  private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE = InterTaskDataStoreProperty.Value.GlusterFileStore;
   private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
   private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
   private static final int PARALLELISM_TEN = 10;
@@ -295,7 +297,7 @@ public final class DataTransferTest {
   private void writeAndRead(final BlockManagerWorker sender,
                             final BlockManagerWorker receiver,
                             final DataCommunicationPatternProperty.Value commPattern,
-                            final DataStoreProperty.Value store) throws RuntimeException {
+                            final InterTaskDataStoreProperty.Value store) throws RuntimeException {
     final int testIndex = TEST_INDEX.getAndIncrement();
     final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
     final Pair<IRVertex, IRVertex> verticesPair = setupVertices(edgeId, sender, receiver);
@@ -307,7 +309,7 @@ public final class DataTransferTest {
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
     dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
     dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(InterTaskDataStoreProperty.of(store));
     dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
     dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
     dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
@@ -318,14 +320,8 @@ public final class DataTransferTest {
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdgeBuilder(edgeId)
-        .setEdgeProperties(edgeProperties)
-        .setSrcVertex(srcMockVertex)
-        .setDstVertex(dstMockVertex)
-        .setSrcStage(srcStage)
-        .setDstStage(dstStage)
-        .setSideInputFlag(false)
-        .build();
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+        srcStage, dstStage, false);
 
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -384,7 +380,7 @@ public final class DataTransferTest {
   private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
                                              final BlockManagerWorker receiver,
                                              final DataCommunicationPatternProperty.Value commPattern,
-                                             final DataStoreProperty.Value store) throws RuntimeException {
+                                             final InterTaskDataStoreProperty.Value store) throws RuntimeException {
     final int testIndex = TEST_INDEX.getAndIncrement();
     final int testIndex2 = TEST_INDEX.getAndIncrement();
     final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
@@ -405,7 +401,7 @@ public final class DataTransferTest {
         = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
     duplicateDataProperty.get().setGroupSize(2);
-    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(InterTaskDataStoreProperty.of(store));
     dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
     final RuntimeEdge dummyEdge, dummyEdge2;
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
@@ -414,24 +410,12 @@ public final class DataTransferTest {
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdgeBuilder(edgeId)
-        .setEdgeProperties(edgeProperties)
-        .setSrcVertex(srcMockVertex)
-        .setDstVertex(dstMockVertex)
-        .setSrcStage(srcStage)
-        .setDstStage(dstStage)
-        .setSideInputFlag(false)
-        .build();
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+        srcStage, dstStage, false);
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
     final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
-    dummyEdge2 = new StageEdgeBuilder(edgeId2)
-        .setEdgeProperties(edgeProperties)
-        .setSrcVertex(srcMockVertex)
-        .setDstVertex(dstMockVertex2)
-        .setSrcStage(srcStage)
-        .setDstStage(dstStage2)
-        .setSideInputFlag(false)
-        .build();
+    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
+        srcStage, dstStage, false);
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
@@ -561,6 +545,9 @@ public final class DataTransferTest {
   private Stage setupStages(final String stageId) {
     final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
 
-    return new Stage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
+    final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty = new ExecutionPropertyMap<>(stageId);
+    stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN));
+    stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0));
+    return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 998f3aa..5e0a267 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -20,10 +20,11 @@ import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.InMemorySourceVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
@@ -67,7 +68,8 @@ import static org.mockito.Mockito.*;
     TaskStateManager.class, StageEdge.class})
 public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
-  private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
+  private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
+      = new ExecutionPropertyMap<>("TASK_EXECUTION_PROPERTY_MAP");
   private static final int SOURCE_PARALLELISM = 5;
   private List<Integer> elements;
   private Map<String, List> vertexIdToOutputData;
@@ -137,7 +139,7 @@ public final class TaskExecutorTest {
             "testSourceVertexDataFetching",
             generateTaskId(),
             0,
-            CONTAINER_TYPE,
+            TASK_EXECUTION_PROPERTY_MAP,
             new byte[0],
             Collections.emptyList(),
             Collections.singletonList(mockStageEdgeFrom(sourceIRVertex)),
@@ -167,7 +169,7 @@ public final class TaskExecutorTest {
         "testSourceVertexDataFetching",
         generateTaskId(),
         0,
-        CONTAINER_TYPE,
+        TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
         Collections.singletonList(mockStageEdgeTo(vertex)),
         Collections.singletonList(mockStageEdgeFrom(vertex)),
@@ -205,7 +207,7 @@ public final class TaskExecutorTest {
         "testSourceVertexDataFetching",
         generateTaskId(),
         0,
-        CONTAINER_TYPE,
+        TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
         Collections.singletonList(mockStageEdgeTo(operatorIRVertex1)),
         Collections.singletonList(mockStageEdgeFrom(operatorIRVertex2)),
@@ -237,7 +239,7 @@ public final class TaskExecutorTest {
         "testSourceVertexDataFetching",
         generateTaskId(),
         0,
-        CONTAINER_TYPE,
+        TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
         Arrays.asList(mockStageEdgeTo(operatorIRVertex1), mockStageEdgeTo(operatorIRVertex2)),
         Collections.singletonList(mockStageEdgeFrom(operatorIRVertex2)),
@@ -260,7 +262,7 @@ public final class TaskExecutorTest {
                                            final boolean isSideInput) {
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
     ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
-    edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    edgeProperties.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
 
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 273f108..d06e1b5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -404,7 +404,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
           physicalPlan.getId(),
           taskId,
           attemptIdx,
-          stageToSchedule.getContainerType(),
+          stageToSchedule.getExecutionProperties(),
           stageToSchedule.getSerializedIRDAG(),
           stageIncomingEdges,
           stageOutgoingEdges,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index b8f7d74..5f64e9c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -44,13 +44,15 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
                                                              final Task task) {
 
-    if (task.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    final String executorPlacementPropertyValue = task.getPropertyValue(ExecutorPlacementProperty.class)
+        .orElse(ExecutorPlacementProperty.NONE);
+    if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(task.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(executorPlacementPropertyValue))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index b507a0b..ddcfbf5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -62,7 +62,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
       if (taskIdToTask == null) {
         final Map<String, Task> taskIdToTaskMap = new HashMap<>();
         taskIdToTaskMap.put(task.getTaskId(), task);
-        updateSchedulableStages(stageId, task.getContainerType());
+        updateSchedulableStages(stageId);
         return taskIdToTaskMap;
       } else {
         taskIdToTask.put(task.getTaskId(), task);
@@ -102,7 +102,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
       }
       stageIdToPendingTasks.remove(stageId);
       stageIdToPendingTasks.forEach((scheduledStageId, tasks) ->
-          updateSchedulableStages(scheduledStageId, tasks.values().iterator().next().getContainerType()));
+          updateSchedulableStages(scheduledStageId));
     }
 
     return taskToSchedule;
@@ -157,18 +157,14 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    * NOTE: This method provides the "line up" between stages, by assigning priorities,
    * serving as the key to the "priority" implementation of this class.
    * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be scheduled.
    */
-  private synchronized void updateSchedulableStages(
-      final String candidateStageId, final String candidateStageContainerType) {
+  private synchronized void updateSchedulableStages(final String candidateStageId) {
     final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
 
-    if (isSchedulable(candidateStageId, candidateStageContainerType)) {
+    if (isSchedulable(candidateStageId)) {
       // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
       jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        // Remove the ancestor stage if it is of the same container type.
-        if (schedulableStages.contains(ancestorStage.getId())
-            && candidateStageContainerType.equals(ancestorStage.getContainerType())) {
+        if (schedulableStages.contains(ancestorStage.getId())) {
           if (!schedulableStages.remove(ancestorStage.getId())) {
             throw new RuntimeException(String.format("No such stage: %s", ancestorStage.getId()));
           }
@@ -183,16 +179,13 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
   /**
    * Determines whether the given candidate stage is schedulable immediately or not.
    * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be scheduled.
    * @return true if schedulable, false otherwise.
    */
-  private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
+  private synchronized boolean isSchedulable(final String candidateStageId) {
     final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
     for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
       if (schedulableStages.contains(descendantStage.getId())) {
-        if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
-          return false;
-        }
+        return false;
       }
     }
     return true;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index b8953fb..db5b1d4 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -122,7 +122,7 @@ public final class JobStateManagerTest {
     final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
     final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
+        new PhysicalPlan("TestPlan", physicalDAG),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertFalse(jobStateManager.checkJobTermination());
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 55808c9..7a2d149 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -49,7 +49,8 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
     final Task task1 = mock(Task.class);
-    when(task1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    when(task1.getPropertyValue(ExecutorPlacementProperty.class))
+        .thenReturn(Optional.of(ExecutorPlacementProperty.RESERVED));
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
@@ -60,7 +61,8 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
     assertEquals(expectedExecutors1, candidateExecutors1);
 
     final Task task2 = mock(Task.class);
-    when(task2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    when(task2.getPropertyValue(ExecutorPlacementProperty.class))
+        .thenReturn(Optional.of(ExecutorPlacementProperty.NONE));
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index bb9fc2c..96c65eb 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -136,18 +136,18 @@ public final class FaultToleranceTest {
     final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0) {
+      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 1) {
+      } else if (stage.getScheduleGroupIndex() == 2) {
         scheduler.onExecutorRemoved("a3");
-        // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+        // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
@@ -165,11 +165,14 @@ public final class FaultToleranceTest {
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else {
-        // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
+      } else if (stage.getScheduleGroupIndex() == 3) {
+        // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3.
         // Schedule only the first Task
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, true);
+      } else {
+        throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d",
+            stage.getScheduleGroupIndex()));
       }
     }
   }
@@ -207,17 +210,17 @@ public final class FaultToleranceTest {
     final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0) {
+      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+      } else if (stage.getScheduleGroupIndex() == 2) {
+        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
@@ -230,7 +233,7 @@ public final class FaultToleranceTest {
 
         }
 
-        assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 3);
+        assertEquals(3, jobStateManager.getAttemptCountForStage(stage.getId()));
         assertFalse(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId -> {
           assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
@@ -273,17 +276,17 @@ public final class FaultToleranceTest {
     final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0) {
+      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+      } else if (stage.getScheduleGroupIndex() == 2) {
+        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
@@ -296,7 +299,7 @@ public final class FaultToleranceTest {
 
         }
 
-        assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
+        assertEquals(2, jobStateManager.getAttemptCountForStage(stage.getId()));
         stage.getTaskIds().forEach(taskId -> {
           assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
               TaskState.State.READY);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 23c5ee7..bb48746 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -217,7 +217,7 @@ public final class SingleTaskQueueTest {
             "TestPlan",
             taskId,
             0,
-            stage.getContainerType(),
+            stage.getExecutionProperties(),
             stage.getSerializedIRDAG(),
             Collections.emptyList(),
             Collections.emptyList(),
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 65f10b2..0af8c8d 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -100,7 +100,7 @@ public final class TestPlanGenerator {
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
-    return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
+    return new PhysicalPlan("TestPlan", physicalDAG);
   }
 
   /**
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
new file mode 100644
index 0000000..a60cf88
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan;
+
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.common.test.EmptyComponents;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests {@link StagePartitioner}.
+ */
+public final class StagePartitionerTest {
+  private static final Transform EMPTY_TRANSFORM = new EmptyComponents.EmptyTransform("empty");
+
+  private StagePartitioner stagePartitioner;
+
+  @Before
+  public void setup() throws InjectionException {
+    stagePartitioner = Tang.Factory.getTang().newInjector().getInstance(StagePartitioner.class);
+    stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
+  }
+
+  /**
+   * @param parallelism {@link ParallelismProperty} value for the new vertex
+   * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for the new vertex
+   * @param otherProperties other {@link VertexExecutionProperty} for the new vertex
+   * @return new {@link IRVertex}
+   */
+  private static IRVertex newVertex(final int parallelism, final int scheduleGroupIndex,
+                                    final List<VertexExecutionProperty> otherProperties) {
+    final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
+    vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
+    vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+    otherProperties.forEach(property -> vertex.getExecutionProperties().put(property));
+    return vertex;
+  }
+
+  /**
+   * A simple case where two vertices have common parallelism and ScheduleGroupIndex so that get merged into one stage.
+   */
+  @Test
+  public void testLinear() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(5, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertEquals(0, (int) partitioning.get(v0));
+    assertEquals(0, (int) partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where two vertices have different parallelism.
+   */
+  @Test
+  public void testSplitByParallelism() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where two vertices have different ScheduleGroupIndex.
+   */
+  @Test
+  public void testSplitByScheduleGroupIndex() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(1, 1, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where two vertices are connected with Shuffle edge.
+   */
+  @Test
+  public void testSplitByShuffle() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where one of the two vertices has additional property.
+   */
+  @Test
+  public void testSplitByOtherProperty() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0,
+        Arrays.asList(ExecutorPlacementProperty.of(ExecutorPlacementProperty.RESERVED)));
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where one of the two vertices has ignored property.
+   */
+  @Test
+  public void testNotSplitByIgnoredProperty() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0,
+        Arrays.asList(DynamicOptimizationProperty.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertEquals(0, (int) partitioning.get(v0));
+    assertEquals(0, (int) partitioning.get(v1));
+  }
+
+  /**
+   * Test scenario when there is a join.
+   */
+  @Test
+  public void testJoin() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v2 = newVertex(5, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.addVertex(v2);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v2));
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+    assertNotEquals(partitioning.get(v1), partitioning.get(v2));
+    assertNotEquals(partitioning.get(v2), partitioning.get(v0));
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index 03952fe..0d60a5d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -82,7 +82,7 @@ public class ClientEndpointTest {
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
     final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
+        new PhysicalPlan("TestPlan", physicalDAG),
         pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
new file mode 100644
index 0000000..5cf95b5
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.common.test.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
+import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
+import edu.snu.nemo.tests.compiler.CompilerTestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link DefaultScheduleGroupPass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class DefaultScheduleGroupPassTest {
+  private static final Transform EMPTY_TRANSFORM = new EmptyComponents.EmptyTransform("empty");
+
+  @Test
+  public void testAnnotatingPass() {
+    final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass();
+    assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
+  }
+
+  /**
+   * This test ensures that a topologically sorted DAG has an increasing sequence of schedule group indexes.
+   */
+  @Test
+  public void testTopologicalOrdering() throws Exception {
+    final DAG<IRVertex, IREdge> compiledDAG = CompilerTestUtil.compileALSDAG();
+    final DAG<IRVertex, IREdge> processedDAG = CompiletimeOptimizer.optimize(compiledDAG,
+        new TestPolicy(), "");
+
+    for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
+      final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
+      final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
+          .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
+          .max().orElse(0);
+      assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
+    }
+  }
+
+  /**
+   * Return a DAG that has a branch.
+   * {@literal
+   *           /-- v3 --- v4
+   * v0 --- v1 --- v2 --/
+   * }
+   *
+   * @param communicationPattern {@link DataCommunicationPatternProperty.Value} for the edges
+   * @param dataFlowModel {@link DataFlowModelProperty.Value} for the edges
+   * @return a {@link Pair} of {@link DAG} and {@link List} of {@link IRVertex}
+   */
+  private static Pair<DAG<IRVertex, IREdge>, List<IRVertex>> generateBranchDAG(
+      final DataCommunicationPatternProperty.Value communicationPattern,
+      final DataFlowModelProperty.Value dataFlowModel) {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final IRVertex v0 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v1 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v2 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v3 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v4 = new OperatorVertex(EMPTY_TRANSFORM);
+
+    final IREdge e0 = new IREdge(communicationPattern, v0, v1);
+    final IREdge e1 = new IREdge(communicationPattern, v1, v2);
+    final IREdge e2 = new IREdge(communicationPattern, v1, v3);
+    final IREdge e3 = new IREdge(communicationPattern, v2, v4);
+    final IREdge e4 = new IREdge(communicationPattern, v3, v4);
+
+    final List<IRVertex> vertices = Arrays.asList(v0, v1, v2, v3, v4);
+    for (final IRVertex vertex : vertices) {
+      dagBuilder.addVertex(vertex);
+    }
+    for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
+      edge.getExecutionProperties().put(DataFlowModelProperty.of(dataFlowModel));
+      dagBuilder.connectVertices(edge);
+    }
+    return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
+  }
+
+  /**
+   * Return a DAG that has a join.
+   * {@literal
+   * v0 --- v1 --- v4 -- v5
+   * v2 --- v3 --/
+   * }
+   *
+   * @param communicationPattern {@link DataCommunicationPatternProperty.Value} for the edges
+   * @param dataFlowModel {@link DataFlowModelProperty.Value} for the edges
+   * @return a {@link Pair} of {@link DAG} and {@link List} of {@link IRVertex}
+   */
+  private static Pair<DAG<IRVertex, IREdge>, List<IRVertex>> generateJoinDAG(
+      final DataCommunicationPatternProperty.Value communicationPattern,
+      final DataFlowModelProperty.Value dataFlowModel) {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final IRVertex v0 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v1 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v2 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v3 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v4 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v5 = new OperatorVertex(EMPTY_TRANSFORM);
+
+    final IREdge e0 = new IREdge(communicationPattern, v0, v1);
+    final IREdge e1 = new IREdge(communicationPattern, v2, v3);
+    final IREdge e2 = new IREdge(communicationPattern, v1, v4);
+    final IREdge e3 = new IREdge(communicationPattern, v3, v4);
+    final IREdge e4 = new IREdge(communicationPattern, v4, v5);
+
+    final List<IRVertex> vertices = Arrays.asList(v0, v1, v2, v3, v4, v5);
+    for (final IRVertex vertex : vertices) {
+      dagBuilder.addVertex(vertex);
+    }
+    for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
+      edge.getExecutionProperties().put(DataFlowModelProperty.of(dataFlowModel));
+      dagBuilder.connectVertices(edge);
+    }
+    return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
+  }
+
+  /**
+   * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code expected}.
+   * @param expected the expected property value
+   * @param vertex the vertex to test
+   */
+  private static void assertScheduleGroupIndex(final int expected, final IRVertex vertex) {
+    assertEquals(expected, getScheduleGroupIndex(vertex));
+  }
+
+  /**
+   * @param vertex a vertex
+   * @return {@link ScheduleGroupIndexProperty} of {@code vertex}
+   */
+  private static int getScheduleGroupIndex(final IRVertex vertex) {
+    return vertex.getPropertyValue(ScheduleGroupIndexProperty.class)
+        .orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup not set for %s", vertex.getId())));
+  }
+
+  /**
+   * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupIndexProperty} value.
+   * @param vertices vertices to test
+   */
+  private static void assertDifferentScheduleGroupIndex(final Collection<IRVertex> vertices) {
+    final Set<Integer> indices = new HashSet<>();
+    vertices.forEach(v -> {
+      final int idx = getScheduleGroupIndex(v);
+      assertFalse(indices.contains(idx));
+      indices.add(idx);
+    });
+  }
+
+  /**
+   * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code true} and the DAG contains a branch.
+   */
+  @Test
+  public void testBranch() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+    pass.apply(dag.left());
+    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+  }
+
+  /**
+   * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code false} and the DAG contains a branch.
+   */
+  @Test
+  public void testBranchWhenMultipleInEdgeNotAllowed() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, false, false);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+    pass.apply(dag.left());
+    dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v));
+  }
+
+  /**
+   * Test scenario to determine whether push edges properly enforces same scheduleGroupIndex or not.
+   */
+  @Test
+  public void testBranchWithPush() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, false, false);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Push);
+    pass.apply(dag.left());
+    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+  }
+
+  /**
+   * Test scenario when {@code allowBroadcastWithinScheduleGroup} is {@code false} and DAG contains Broadcast edges.
+   */
+  @Test
+  public void testBranchWithBroadcast() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, true, true);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, DataFlowModelProperty.Value.Pull);
+    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+  }
+
+  /**
+   * Test scenario when {@code allowShuffleWithinScheduleGroup} is {@code false} and DAG contains Shuffle edges.
+   */
+  @Test
+  public void testBranchWithShuffle() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, false, true);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Pull);
+    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+  }
+
+  /**
+   * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code true} and the DAG contains a join.
+   */
+  @Test
+  public void testJoin() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+    pass.apply(dag.left());
+    final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 4).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+    dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v));
+  }
+
+  /**
+   * Test scenario with multiple push inEdges.
+   */
+  @Test
+  public void testJoinWithPush() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
+    pass.apply(dag.left());
+    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+  }
+
+  /**
+   * Test scenario when single push inEdges.
+   */
+  @Test
+  public void testJoinWithSinglePush() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
+    dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
+        .getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+    pass.apply(dag.left());
+    final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 6).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
deleted file mode 100644
index 9e222fc..0000000
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
-import edu.snu.nemo.tests.compiler.CompilerTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test {@link ScheduleGroupPass}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(JobLauncher.class)
-public final class ScheduleGroupPassTest {
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  @Test
-  public void testAnnotatingPass() {
-    final AnnotatingPass scheduleGroupPass = new ScheduleGroupPass();
-    assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
-  }
-
-  /**
-   * This test ensures that a topologically sorted DAG has an increasing sequence of schedule group indexes.
-   */
-  @Test
-  public void testScheduleGroupPass() throws Exception {
-    final DAG<IRVertex, IREdge> compiledDAG = CompilerTestUtil.compileALSDAG();
-    final DAG<IRVertex, IREdge> processedDAG = CompiletimeOptimizer.optimize(compiledDAG,
-        new TestPolicy(), "");
-
-    for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
-      final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
-      final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
-          .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
-          .max().orElse(0);
-      assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
-    }
-  }
-}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
index 898f74e..d4cc616 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
@@ -19,13 +19,11 @@ import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DisaggregationEdgeDataStorePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ReviseInterStageEdgeDataStorePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultInterTaskDataStorePass;
 import edu.snu.nemo.tests.compiler.CompilerTestUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,22 +50,12 @@ public class DisaggregationPassTest {
   public void testDisaggregation() throws Exception {
     final DAG<IRVertex, IREdge> processedDAG =
         new DisaggregationEdgeDataStorePass().apply(
-            new ReviseInterStageEdgeDataStorePass().apply(
-                new DefaultStagePartitioningPass().apply(
-                    new DefaultParallelismPass().apply(compiledDAG))));
+            new DefaultInterTaskDataStorePass().apply(
+                  new DefaultParallelismPass().apply(compiledDAG)));
 
-    processedDAG.getTopologicalSort().forEach(irVertex -> {
-      processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger -> {
-        if (DataCommunicationPatternProperty.Value.OneToOne
-            .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())
-            && edgeToMerger.getSrc().getPropertyValue(StageIdProperty.class).get()
-            .equals(edgeToMerger.getDst().getPropertyValue(StageIdProperty.class).get())) {
-          assertEquals(DataStoreProperty.Value.MemoryStore, edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
-        } else {
-          assertEquals(DataStoreProperty.Value.GlusterFileStore,
-              edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
-        }
-      });
-    });
+    processedDAG.getTopologicalSort().forEach(irVertex ->
+      processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger ->
+          assertEquals(InterTaskDataStoreProperty.Value.GlusterFileStore,
+              edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get())));
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
index 73014e2..f983698 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
@@ -19,7 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.PadoEdgeDataStorePass;
@@ -57,35 +57,35 @@ public class PadoCompositePassTest {
     final IRVertex vertex5 = processedDAG.getTopologicalSort().get(1);
     assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex5.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex5).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex6 = processedDAG.getTopologicalSort().get(2);
     assertEquals(ExecutorPlacementProperty.RESERVED, vertex6.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex6).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex4 = processedDAG.getTopologicalSort().get(6);
     assertEquals(ExecutorPlacementProperty.RESERVED, vertex4.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex4).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex12 = processedDAG.getTopologicalSort().get(10);
     assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex12.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex12).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex14 = processedDAG.getTopologicalSort().get(12);
     assertEquals(ExecutorPlacementProperty.RESERVED, vertex14.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex14).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index e779f58..ab1beb7 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -61,8 +61,8 @@ public class SailfishPassTest {
                 edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
             assertEquals(UsedDataHandlingProperty.Value.Discard,
                 edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
-            assertEquals(DataStoreProperty.Value.SerializedMemoryStore,
-                edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
+            assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
+                edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
             assertEquals(BytesDecoderFactory.of(),
                 edgeToMerger.getPropertyValue(DecoderProperty.class).get());
           } else {
@@ -75,8 +75,8 @@ public class SailfishPassTest {
               edgeFromMerger.getPropertyValue(DataFlowModelProperty.class).get());
           assertEquals(DataCommunicationPatternProperty.Value.OneToOne,
               edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
-          assertEquals(DataStoreProperty.Value.LocalFileStore,
-              edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
+          assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
+              edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
           assertEquals(BytesEncoderFactory.of(),
               edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
         });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 8d13374..b1ea8f8 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -16,8 +16,7 @@
 package edu.snu.nemo.tests.compiler.optimizer.policy;
 
 import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.PadoCompositePass;
 import edu.snu.nemo.compiler.optimizer.policy.*;
 import org.junit.Test;
@@ -29,21 +28,21 @@ public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(15, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(14, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(17, padoPolicy.getCompileTimePasses().size());
+    assertEquals(16, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(19, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(18, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 
@@ -52,8 +51,7 @@ public final class PolicyBuilderTest {
     try {
       final Policy failPolicy = new PolicyBuilder()
           .registerCompileTimePass(new PadoCompositePass())
-          .registerCompileTimePass(new DefaultStagePartitioningPass())
-          .registerCompileTimePass(new ScheduleGroupPass())
+          .registerCompileTimePass(new DefaultScheduleGroupPass())
           .build();
     } catch (Exception e) { // throw an exception if default execution properties are not set.
       assertTrue(e instanceof CompileTimeOptimizationException);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
index 51ca1ae..59dbd4e 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
@@ -39,13 +39,12 @@ public final class TestPolicy implements Policy {
   @Override
   public List<CompileTimePass> getCompileTimePasses() {
     List<CompileTimePass> policy = new ArrayList<>();
-    policy.add(new DefaultStagePartitioningPass());
 
     if (testPushPolicy) {
       policy.add(new ShuffleEdgePushPass());
     }
 
-    policy.add(new ScheduleGroupPass());
+    policy.add(new DefaultScheduleGroupPass());
     return policy;
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index 671fdb0..5175045 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -154,35 +154,35 @@ public final class DAGConverterTest {
 //    irDAGBuilder.addVertex(v7);
 
     final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2);
-    e1.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e1.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e1.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
     final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3);
-    e2.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e2.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e2.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
     final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
-    e3.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e3.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e3.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
     final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5);
-    e4.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e4.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e4.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
     final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6);
-    e5.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+    e5.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
     e5.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
     final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8);
-    e6.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+    e6.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
     e6.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
 //    final IREdge e7 = new IREdge(OneToOne, v7, v5);
-//    e7.setProperty(DataStoreProperty.of(MemoryStore));
+//    e7.setProperty(InterTaskDataStoreProperty.of(MemoryStore));
 //    e7.setProperty(Attribute.Key.PullOrPush, DataFlowModelProperty.Value.Push));
 //
 //    final IREdge e8 = new IREdge(OneToOne, v5, v8);
-//    e8.setProperty(DataStoreProperty.of(MemoryStore));
+//    e8.setProperty(InterTaskDataStoreProperty.of(MemoryStore));
 //    e8.setProperty(Attribute.Key.PullOrPush, DataFlowModelProperty.Value.Pull));
 
     // Stage 1 = {v1, v2, v3}