You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/25 03:17:12 UTC

[GitHub] johnyangk closed pull request #51: [NEMO-102] Stage Partitioning by PhysicalPlanGenerator

johnyangk closed pull request #51: [NEMO-102] Stage Partitioning by PhysicalPlanGenerator
URL: https://github.com/apache/incubator-nemo/pull/51
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 6f2d3398..a0262efe 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -26,9 +26,8 @@
 
 nextIdx = 0
 
-def edgePropertiesString(properties):
-    prop = {p[0]: p[1] for p in properties.items() if p[0] != 'Coder'}
-    return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(prop.items())])
+def propertiesToString(properties):
+    return '<BR/>'.join(['{}={}'.format(re.sub('Property$', '', item[0].split('.')[-1]), item[1]) for item in sorted(properties.items())])
 
 def getIdx():
     global nextIdx
@@ -118,10 +117,6 @@ def Vertex(id, properties, state):
         return Stage(id, properties, state)
     except:
         pass
-    try:
-        return Stage(id, properties)
-    except:
-        pass
     try:
         return LoopVertex(id, properties)
     except:
@@ -138,33 +133,18 @@ def __init__(self, id, properties, state):
     def dot(self):
         color = 'black'
         try:
-            if (self.properties['executionProperties']['ExecutorPlacement'] == 'Transient'):
+            placement = self.properties['executionProperties']['edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty']
+            if (placement == 'Transient'):
                 color = 'orange'
-            if (self.properties['executionProperties']['ExecutorPlacement'] == 'Reserved'):
+            if (placement == 'Reserved'):
                 color = 'green'
         except:
             pass
         label = self.id
         if self.state is not None:
-            label += '\\n({})'.format(self.state)
-        try:
-            label += ' (p{})'.format(self.properties['executionProperties']['Parallelism'])
-        except:
-            pass
-        try:
-            label += ' (s{})'.format(self.properties['executionProperties']['ScheduleGroupIndex'])
-        except:
-            pass
+            label += '<BR/>({})'.format(self.state)
         try:
-            label += '\\n{}'.format(self.properties['source'])
-        except:
-            pass
-        try:
-            label += '\\n{}'.format(self.properties['runtimeVertexId'])
-        except:
-            pass
-        try:
-            label += '\\n{}'.format(self.properties['index'])
+            label += '<BR/>{}'.format(self.properties['source'])
         except:
             pass
         try:
@@ -174,15 +154,19 @@ def dot(self):
                 class_name = transform[1].split('{')[0].split('.')[-1].split('$')[0].split('@')[0]
             except IndexError:
                 class_name = '?'
-            label += '\\n{}:{}'.format(transform_name, class_name)
+            label += '<BR/>{}:{}'.format(transform_name, class_name)
         except:
             pass
         if ('class' in self.properties and self.properties['class'] == 'MetricCollectionBarrierVertex'):
             shape = ', shape=box'
-            label += '\\nMetricCollectionBarrier'
+            label += '<BR/>MetricCollectionBarrier'
         else:
             shape = ''
-        dot = '{} [label="{}", color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color, stateToColor(self.state), shape)
+        try:
+            label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.properties['executionProperties']))
+        except:
+            pass
+        dot = '{} [label=<{}>, color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color, stateToColor(self.state), shape)
         return dot
     @property
     def oneVertex(self):
@@ -191,27 +175,6 @@ def oneVertex(self):
     def logicalEnd(self):
         return self.idx
 
-class Stage:
-    def __init__(self, id, properties):
-        self.id = id
-        self.internalDAG = DAG(properties['stageInternalDAG'], JobState.empty())
-        self.idx = getIdx()
-    @property
-    def dot(self):
-        dot = ''
-        dot += 'subgraph cluster_{} {{'.format(self.idx)
-        dot += 'label = "{}";'.format(self.id)
-        dot += 'color=blue;'
-        dot += self.internalDAG.dot
-        dot += '}'
-        return dot
-    @property
-    def oneVertex(self):
-        return next(iter(self.internalDAG.vertices.values())).oneVertex
-    @property
-    def logicalEnd(self):
-        return 'cluster_{}'.format(self.idx)
-
 class LoopVertex:
     def __init__(self, id, properties):
         self.id = id
@@ -226,10 +189,10 @@ def __init__(self, id, properties):
     def dot(self):
         label = self.id
         try:
-            label += ' (p{})'.format(self.executionProperties['Parallelism'])
+            label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
         except:
             pass
-        label += '\\n(Remaining iteration: {})'.format(self.remaining_iteration)
+        label += '<BR/>(Remaining iteration: {})'.format(self.remaining_iteration)
         dot = 'subgraph cluster_{} {{'.format(self.idx)
         dot += 'label = "{}";'.format(label)
         dot += self.dag.dot
@@ -253,24 +216,30 @@ def internalDstFor(self, edgeWithLoopId):
 class Stage:
     def __init__(self, id, properties, state):
         self.id = id
-        self.irVertex = DAG(properties['irDag'], JobState.empty())
+        self.properties = properties
+        self.stageDAG = DAG(properties['irDag'], JobState.empty())
         self.idx = getIdx()
         self.state = state
+        self.executionProperties = self.properties['executionProperties']
     @property
     def dot(self):
         if self.state.state is None:
             state = ''
         else:
             state = ' ({})'.format(self.state.state)
+        label = '{}{}'.format(self.id, state)
+        if self.state.tasks:
+            label += '<BR/><BR/>{} Task(s):<BR/>{}'.format(len(self.state.tasks), self.state.taskStateSummary)
+        label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
         dot = 'subgraph cluster_{} {{'.format(self.idx)
-        dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
+        dot += 'label = <{}>;'.format(label)
         dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
-        dot += self.irVertex.dot
+        dot += self.stageDAG.dot
         dot += '}'
         return dot
     @property
     def oneVertex(self):
-        return next(iter(self.irVertex.vertices.values())).oneVertex
+        return next(iter(self.stageDAG.vertices.values())).oneVertex
     @property
     def logicalEnd(self):
         return 'cluster_{}'.format(self.idx)
@@ -305,8 +274,6 @@ def __init__(self, src, dst, properties):
         self.dst = dst
         self.id = properties['id']
         self.executionProperties = properties['executionProperties']
-        self.encoderFactory = self.executionProperties['Encoder']
-        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
         src = self.src
@@ -319,21 +286,19 @@ def dot(self):
             dst = dst.internalDstFor(self.id)
         except:
             pass
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+        label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.id, propertiesToString(self.executionProperties))
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
 class StageEdge:
     def __init__(self, src, dst, properties):
-        self.src = src.internalDAG.vertices[properties['srcVertex']]
-        self.dst = dst.internalDAG.vertices[properties['dstVertex']]
+        self.src = src.stageDAG.vertices[properties['externalSrcVertexId']]
+        self.dst = dst.stageDAG.vertices[properties['externalDstVertexId']]
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.encoderFactory = self.executionProperties['Encoder']
-        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+        label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId, propertiesToString(self.executionProperties))
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
@@ -343,11 +308,9 @@ def __init__(self, src, dst, properties):
         self.dst = dst
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.encoderFactory = self.executionProperties['Encoder']
-        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+        label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId, propertiesToString(self.executionProperties))
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
index 069e2542..e62e0b86 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
@@ -75,7 +75,7 @@ private BytesDecoder(final InputStream inputStream) {
 
       final int lengthToRead = byteOutputStream.getCount();
       if (lengthToRead == 0) {
-        throw new IOException("EoF (empty partition)!"); // TODO #?: use EOF exception instead of IOException.
+        throw new IOException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
       }
       final byte[] resultBytes = new byte[lengthToRead]; // Read the size of this byte array.
       System.arraycopy(byteOutputStream.getBufDirectly(), 0, resultBytes, 0, lengthToRead);
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index a93d8437..dc67ff35 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -26,6 +26,7 @@
  *
  * @param <T> element type.
  */
+// TODO #120: Separate EOFException from Decoder Failures
 public interface DecoderFactory<T> extends Serializable {
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index 1366ecc9..46198f01 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -204,7 +204,7 @@ public E getEdgeBetween(final String srcVertexId, final String dstVertexId) thro
   /**
    * Indicates the traversal order of this DAG.
    */
-  public enum TraversalOrder {
+  private enum TraversalOrder {
     PreOrder,
     PostOrder
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index ed051f70..35c43301 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -23,8 +23,6 @@
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.io.Serializable;
 import java.util.*;
@@ -259,22 +257,6 @@ private void executionPropertyCheck() {
           throw new RuntimeException("DAG execution property check: "
               + "DataSizeMetricCollection edge is not compatible with push" + e.getId());
         }));
-    // All vertices with same Stage Id should have identical Parallelism execution property.
-    final HashMap<Integer, Integer> stageIdToParallelismMap = new HashMap<>();
-    vertices.stream().filter(v -> v instanceof IRVertex)
-        .map(v -> (IRVertex) v)
-        .forEach(v -> {
-          final Optional<Integer> stageId = v.getPropertyValue(StageIdProperty.class);
-          if (stageId.isPresent()) {
-            if (!stageIdToParallelismMap.containsKey(stageId.get())) {
-              stageIdToParallelismMap.put(stageId.get(), v.getPropertyValue(ParallelismProperty.class).get());
-            } else if (!stageIdToParallelismMap.get(stageId.get())
-                .equals(v.getPropertyValue(ParallelismProperty.class).get())) {
-              throw new RuntimeException("DAG execution property check: vertices are in a same stage, "
-                  + "but has different parallelism execution properties: Stage" + stageId.get() + ": " + v.getId());
-            }
-          }
-        });
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
similarity index 81%
rename from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
index 6e87844f..af6c85cf 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
@@ -20,12 +20,12 @@
 /**
  * DataStore ExecutionProperty.
  */
-public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProperty.Value> {
+public final class InterTaskDataStoreProperty extends EdgeExecutionProperty<InterTaskDataStoreProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private DataStoreProperty(final Value value) {
+  private InterTaskDataStoreProperty(final Value value) {
     super(value);
   }
 
@@ -34,8 +34,8 @@ private DataStoreProperty(final Value value) {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static DataStoreProperty of(final Value value) {
-    return new DataStoreProperty(value);
+  public static InterTaskDataStoreProperty of(final Value value) {
+    return new InterTaskDataStoreProperty(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index c4b297b4..fdbd5dc2 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -25,7 +25,6 @@
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -33,6 +32,7 @@
 import java.util.*;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * ExecutionPropertyMap Class, which uses HashMap for keeping track of ExecutionProperties for vertices and edges.
@@ -67,22 +67,23 @@ public ExecutionPropertyMap(final String id) {
     switch (commPattern) {
       case Shuffle:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
         break;
       case BroadCast:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
         break;
       case OneToOne:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
         break;
       default:
         map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-        map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+        map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
     }
     return map;
   }
+
   /**
    * Static initializer for irVertex.
    * @param irVertex irVertex to keep the execution property of.
@@ -148,6 +149,13 @@ public void forEachProperties(final Consumer<? super T> action) {
     properties.values().forEach(action);
   }
 
+  /**
+   * @return {@link Stream} of execution properties.
+   */
+  public Stream<T> stream() {
+    return properties.values().stream();
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
@@ -159,7 +167,7 @@ public String toString() {
       }
       isFirstPair = false;
       sb.append("\"");
-      sb.append(entry.getKey());
+      sb.append(entry.getKey().getCanonicalName());
       sb.append("\": \"");
       sb.append(entry.getValue().getValue());
       sb.append("\"");
@@ -174,17 +182,12 @@ public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
-
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-
-    ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
-
-    return new EqualsBuilder()
-        .append(properties.values().stream().collect(Collectors.toSet()),
-            that.properties.values().stream().collect(Collectors.toSet()))
-        .isEquals();
+    final ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
+    return properties.values().stream().collect(Collectors.toSet())
+        .equals(that.properties.values().stream().collect(Collectors.toSet()));
   }
 
   @Override
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
similarity index 61%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
index 09009922..f9668edd 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
@@ -18,23 +18,25 @@
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
- * StageId ExecutionProperty.
+ * Attaching this property makes runtime to skip serialization and deserialization for the vertex input and output.
+ * TODO #118: Implement Skipping (De)Serialization by ExecutionProperty
  */
-public final class StageIdProperty extends VertexExecutionProperty<Integer> {
+public final class SkipSerDesProperty extends VertexExecutionProperty<Boolean> {
+
+  private static final SkipSerDesProperty SKIP_SER_DES_PROPERTY = new SkipSerDesProperty();
+
   /**
    * Constructor.
-   * @param value value of the execution property.
    */
-  private StageIdProperty(final Integer value) {
-    super(value);
+  private SkipSerDesProperty() {
+    super(true);
   }
 
   /**
    * Static method exposing the constructor.
-   * @param value value of the new execution property.
-   * @return the newly created execution property.
+   * @return the execution property.
    */
-  public static StageIdProperty of(final Integer value) {
-    return new StageIdProperty(value);
+  public static SkipSerDesProperty of() {
+    return SKIP_SER_DES_PROPERTY;
   }
 }
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 1ca18ff9..ed272358 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -28,6 +28,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test {@link ExecutionPropertyMap}.
@@ -57,8 +58,8 @@ public void testDefaultValues() {
 
   @Test
   public void testPutGetAndRemove() {
-    edgeMap.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get());
+    edgeMap.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
+    assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, edgeMap.get(InterTaskDataStoreProperty.class).get());
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get());
     edgeMap.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
@@ -72,4 +73,33 @@ public void testPutGetAndRemove() {
     vertexMap.put(ParallelismProperty.of(100));
     assertEquals(100, vertexMap.get(ParallelismProperty.class).get().longValue());
   }
+
+  @Test
+  public void testEquality() {
+    final ExecutionPropertyMap<ExecutionProperty> map0 = new ExecutionPropertyMap<>("map0");
+    final ExecutionPropertyMap<ExecutionProperty> map1 = new ExecutionPropertyMap<>("map1");
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+    map0.put(ParallelismProperty.of(1));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map1.put(ParallelismProperty.of(1));
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+    map1.put(ParallelismProperty.of(2));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map0.put(ParallelismProperty.of(2));
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+    map0.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map1.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
+    assertFalse(map0.equals(map1));
+    assertFalse(map1.equals(map0));
+    map1.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+    assertTrue(map0.equals(map1));
+    assertTrue(map1.equals(map0));
+  }
 }
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 71626def..981ee72d 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -58,8 +58,7 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG) throws Exception
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
-    final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
-        stageDAG, physicalPlanGenerator.getIdToIRVertex());
+    final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
     return physicalPlan;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 0ffddaa6..0c842031 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 
 /**
@@ -48,8 +47,6 @@ public CompressionPass(final CompressionProperty.Value compression) {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
-        .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
-            .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
         .filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
         .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
index a0428a60..857069ba 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
@@ -17,9 +17,9 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 /**
  * Pass to annotate the DAG for a job to perform data skew.
@@ -31,7 +31,7 @@
    * Default constructor.
    */
   public DataSkewEdgeDataStorePass() {
-    super(DataStoreProperty.class);
+    super(InterTaskDataStoreProperty.class);
   }
 
   @Override
@@ -45,9 +45,9 @@ public DataSkewEdgeDataStorePass() {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           // we want it to be in the same stage
           if (edge.equals(edgeToUseMemory)) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
           } else {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
index 6ec2fde8..62830711 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 
 /**
@@ -39,8 +38,6 @@ public DecompressionPass() {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
-        .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
-            .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
         // Find edges which have a compression property but not decompression property.
         .filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
             && !edge.getPropertyValue(DecompressionProperty.class).isPresent())
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
index 81e16f74..0ea2d13d 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
@@ -32,7 +32,7 @@
    * Default constructor.
    */
   public DefaultEdgeUsedDataHandlingPass() {
-    super(UsedDataHandlingProperty.class, Collections.singleton(DataStoreProperty.class));
+    super(UsedDataHandlingProperty.class, Collections.singleton(InterTaskDataStoreProperty.class));
   }
 
   @Override
@@ -40,9 +40,10 @@ public DefaultEdgeUsedDataHandlingPass() {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           if (!irEdge.getPropertyValue(UsedDataHandlingProperty.class).isPresent()) {
-            final DataStoreProperty.Value dataStoreValue = irEdge.getPropertyValue(DataStoreProperty.class).get();
-            if (DataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
-                || DataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
+            final InterTaskDataStoreProperty.Value dataStoreValue
+                = irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get();
+            if (InterTaskDataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
+                || InterTaskDataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
               irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Discard));
             } else {
               irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
similarity index 61%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
index 35c8c5ae..98bca9a6 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
@@ -17,9 +17,8 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.util.Collections;
 import java.util.List;
@@ -27,12 +26,12 @@
 /**
  * Edge data store pass to process inter-stage memory store edges.
  */
-public final class ReviseInterStageEdgeDataStorePass extends AnnotatingPass {
+public final class DefaultInterTaskDataStorePass extends AnnotatingPass {
   /**
    * Default constructor.
    */
-  public ReviseInterStageEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(StageIdProperty.class));
+  public DefaultInterTaskDataStorePass() {
+    super(InterTaskDataStoreProperty.class, Collections.emptySet());
   }
 
   @Override
@@ -40,13 +39,8 @@ public ReviseInterStageEdgeDataStorePass() {
     dag.getVertices().forEach(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
-        inEdges.forEach(edge -> {
-          if (DataStoreProperty.Value.MemoryStore.equals(edge.getPropertyValue(DataStoreProperty.class).get())
-              && !edge.getSrc().getPropertyValue(StageIdProperty.class).get()
-              .equals(edge.getDst().getPropertyValue(StageIdProperty.class).get())) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
-          }
-        });
+        inEdges.forEach(edge -> edge.setProperty(
+            InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore)));
       }
     });
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
new file mode 100644
index 00000000..55c16c51
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.dag.Edge;
+import edu.snu.nemo.common.dag.Vertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A pass for assigning each stages in schedule groups.
+ *
+ * <h3>Rules</h3>
+ * <ul>
+ *   <li>Vertices connected with push edges must be assigned same ScheduleGroup.</li>
+ *   <li>For pull edges,
+ *     <ul>
+ *       <li>if the destination of the edge depends on multiple ScheduleGroups, split ScheduleGroup by the edge.</li>
+ *       <li>if the edge is broadcast type and {@code allowBroadcastWithinScheduleGroup} is {@code false},
+ *       split ScheduleGroup by the edge.</li>
+ *       <li>if the edge is shuffle type and {@code allowShuffleWithinScheduleGroup} is {@code false},
+ *       split ScheduleGroup by the edge.</li>
+ *       <li>if the destination of the edge has multiple inEdges, split ScheduleGroup by the edge.</li>
+ *       <li>Otherwise, the source and the destination of the edge should be assigned same ScheduleGroup.</li>
+ *     </ul>
+ *   </li>
+ * </ul>
+ */
+public final class DefaultScheduleGroupPass extends AnnotatingPass {
+
+  private final boolean allowBroadcastWithinScheduleGroup;
+  private final boolean allowShuffleWithinScheduleGroup;
+  private final boolean allowMultipleInEdgesWithinScheduleGroup;
+
+  /**
+   * Default constructor.
+   */
+  public DefaultScheduleGroupPass() {
+    this(false, false, true);
+  }
+
+  /**
+   * Constructor.
+   * @param allowBroadcastWithinScheduleGroup whether to allow Broadcast edges within a ScheduleGroup or not
+   * @param allowShuffleWithinScheduleGroup whether to allow Shuffle edges within a ScheduleGroup or not
+   * @param allowMultipleInEdgesWithinScheduleGroup whether to allow vertices with multiple dependencies or not
+   */
+  public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup,
+                                  final boolean allowShuffleWithinScheduleGroup,
+                                  final boolean allowMultipleInEdgesWithinScheduleGroup) {
+    super(ScheduleGroupIndexProperty.class, Stream.of(
+        DataCommunicationPatternProperty.class,
+        DataFlowModelProperty.class
+    ).collect(Collectors.toSet()));
+    this.allowBroadcastWithinScheduleGroup = allowBroadcastWithinScheduleGroup;
+    this.allowShuffleWithinScheduleGroup = allowShuffleWithinScheduleGroup;
+    this.allowMultipleInEdgesWithinScheduleGroup = allowMultipleInEdgesWithinScheduleGroup;
+  }
+
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    final Map<IRVertex, ScheduleGroup> irVertexToScheduleGroupMap = new HashMap<>();
+    final Set<ScheduleGroup> scheduleGroups = new HashSet<>();
+    dag.topologicalDo(irVertex -> {
+      // Base case: for root vertices
+      if (!irVertexToScheduleGroupMap.containsKey(irVertex)) {
+        final ScheduleGroup newScheduleGroup = new ScheduleGroup();
+        scheduleGroups.add(newScheduleGroup);
+        newScheduleGroup.vertices.add(irVertex);
+        irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup);
+      }
+      // Get scheduleGroupIndex
+      final ScheduleGroup scheduleGroup = irVertexToScheduleGroupMap.get(irVertex);
+      if (scheduleGroup == null) {
+        throw new RuntimeException(String.format("ScheduleGroup must be set for %s", irVertex));
+      }
+      // Step case: inductively assign ScheduleGroup
+      for (final IREdge edge : dag.getOutgoingEdgesOf(irVertex)) {
+        final IRVertex connectedIRVertex = edge.getDst();
+        // Skip if some vertices that connectedIRVertex depends on do not have assigned a ScheduleGroup
+        boolean skip = false;
+        for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+          if (!irVertexToScheduleGroupMap.containsKey(edgeToConnectedIRVertex.getSrc())) {
+            // connectedIRVertex will be covered when edgeToConnectedIRVertex.getSrc() is visited
+            skip = true;
+            break;
+          }
+        }
+        if (skip) {
+          continue;
+        }
+        // Now we can assure that all vertices that connectedIRVertex depends on have assigned a ScheduleGroup
+
+        // Get ScheduleGroup(s) that push data to the connectedIRVertex
+        final Set<ScheduleGroup> pushScheduleGroups = new HashSet<>();
+        for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+          if (edgeToConnectedIRVertex.getPropertyValue(DataFlowModelProperty.class)
+              .orElseThrow(() -> new RuntimeException(String.format("DataFlowModelProperty for %s must be set",
+                  edgeToConnectedIRVertex.getId()))) == DataFlowModelProperty.Value.Push) {
+            pushScheduleGroups.add(irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc()));
+          }
+        }
+        if (pushScheduleGroups.isEmpty()) {
+          // If allowMultipleInEdgesWithinScheduleGroup is false and connectedIRVertex depends on multiple vertices,
+          // it should be a member of new ScheduleGroup
+          boolean mergability = allowMultipleInEdgesWithinScheduleGroup
+              || dag.getIncomingEdgesOf(connectedIRVertex).size() <= 1;
+          for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+            if (!mergability) {
+              break;
+            }
+            final ScheduleGroup anotherDependency = irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc());
+            if (!scheduleGroup.equals(anotherDependency)) {
+              // Since connectedIRVertex depends on multiple ScheduleGroups, connectedIRVertex must be a member of
+              // new ScheduleGroup
+              mergability = false;
+            }
+            final DataCommunicationPatternProperty.Value communicationPattern = edgeToConnectedIRVertex
+                .getPropertyValue(DataCommunicationPatternProperty.class).orElseThrow(
+                    () -> new RuntimeException(String.format("DataCommunicationPatternProperty for %s must be set",
+                        edgeToConnectedIRVertex.getId())));
+            if (!allowBroadcastWithinScheduleGroup
+                && communicationPattern == DataCommunicationPatternProperty.Value.BroadCast) {
+              mergability = false;
+            }
+            if (!allowShuffleWithinScheduleGroup
+                && communicationPattern == DataCommunicationPatternProperty.Value.Shuffle) {
+              mergability = false;
+            }
+          }
+
+          if (mergability) {
+            // Merge into the existing scheduleGroup
+            scheduleGroup.vertices.add(connectedIRVertex);
+            irVertexToScheduleGroupMap.put(connectedIRVertex, scheduleGroup);
+          } else {
+            // Create a new ScheduleGroup
+            final ScheduleGroup newScheduleGroup = new ScheduleGroup();
+            scheduleGroups.add(newScheduleGroup);
+            newScheduleGroup.vertices.add(connectedIRVertex);
+            irVertexToScheduleGroupMap.put(connectedIRVertex, newScheduleGroup);
+            for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+              final ScheduleGroup src = irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc());
+              final ScheduleGroup dst = newScheduleGroup;
+              src.scheduleGroupsTo.add(dst);
+              dst.scheduleGroupsFrom.add(src);
+            }
+          }
+        } else {
+          // If there are multiple ScheduleGroups that push data to connectedIRVertex, merge them
+          final Iterator<ScheduleGroup> pushScheduleGroupIterator = pushScheduleGroups.iterator();
+          final ScheduleGroup pushScheduleGroup = pushScheduleGroupIterator.next();
+          while (pushScheduleGroupIterator.hasNext()) {
+            final ScheduleGroup anotherPushScheduleGroup = pushScheduleGroupIterator.next();
+            anotherPushScheduleGroup.vertices.forEach(pushScheduleGroup.vertices::add);
+            scheduleGroups.remove(anotherPushScheduleGroup);
+            for (final ScheduleGroup src : anotherPushScheduleGroup.scheduleGroupsFrom) {
+              final ScheduleGroup dst = anotherPushScheduleGroup;
+              final ScheduleGroup newDst = pushScheduleGroup;
+              src.scheduleGroupsTo.remove(dst);
+              src.scheduleGroupsTo.add(newDst);
+              newDst.scheduleGroupsFrom.add(src);
+            }
+            for (final ScheduleGroup dst : anotherPushScheduleGroup.scheduleGroupsTo) {
+              final ScheduleGroup src = anotherPushScheduleGroup;
+              final ScheduleGroup newSrc = pushScheduleGroup;
+              dst.scheduleGroupsFrom.remove(src);
+              dst.scheduleGroupsFrom.add(newSrc);
+              newSrc.scheduleGroupsTo.add(dst);
+            }
+          }
+          // Add connectedIRVertex into the merged pushScheduleGroup
+          pushScheduleGroup.vertices.add(connectedIRVertex);
+          irVertexToScheduleGroupMap.put(connectedIRVertex, pushScheduleGroup);
+        }
+      }
+    });
+
+    // Assign ScheduleGroupIndex property based on topology of ScheduleGroups
+    final MutableInt currentScheduleGroupIndex = new MutableInt(getNextScheudleGroupIndex(dag.getVertices()));
+    final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder = new DAGBuilder<>();
+    scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex);
+    scheduleGroups.forEach(src -> src.scheduleGroupsTo
+        .forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new ScheduleGroupEdge(src, dst))));
+    scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> {
+      boolean usedCurrentIndex = false;
+      for (final IRVertex irVertex : scheduleGroup.vertices) {
+        if (!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) {
+          irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue()));
+          usedCurrentIndex = true;
+        }
+      }
+      if (usedCurrentIndex) {
+        currentScheduleGroupIndex.increment();
+      }
+    });
+    return dag;
+  }
+
+  /**
+   * Determines the range of {@link ScheduleGroupIndexProperty} value that will prevent collision
+   * with the existing {@link ScheduleGroupIndexProperty}.
+   * @param irVertexCollection collection of {@link IRVertex}
+   * @return the minimum value for the {@link ScheduleGroupIndexProperty} that won't collide with the existing values
+   */
+  private int getNextScheudleGroupIndex(final Collection<IRVertex> irVertexCollection) {
+    int nextScheduleGroupIndex = 0;
+    for (final IRVertex irVertex : irVertexCollection) {
+      final Optional<Integer> scheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class);
+      if (scheduleGroupIndex.isPresent()) {
+        nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, nextScheduleGroupIndex);
+      }
+    }
+    return nextScheduleGroupIndex;
+  }
+
+  /**
+   * Vertex in ScheduleGroup DAG.
+   */
+  private static final class ScheduleGroup extends Vertex {
+    private static int nextScheduleGroupId = 0;
+    private final Set<IRVertex> vertices = new HashSet<>();
+    private final Set<ScheduleGroup> scheduleGroupsTo = new HashSet<>();
+    private final Set<ScheduleGroup> scheduleGroupsFrom = new HashSet<>();
+
+    /**
+     * Constructor.
+     */
+    ScheduleGroup() {
+      super(String.format("ScheduleGroup-%d", nextScheduleGroupId++));
+    }
+  }
+
+  /**
+   * Edge in ScheduleGroup DAG.
+   */
+  private static final class ScheduleGroupEdge extends Edge<ScheduleGroup> {
+    private static int nextScheduleGroupEdgeId = 0;
+
+    /**
+     * Constructor.
+     *
+     * @param src source vertex.
+     * @param dst destination vertex.
+     */
+    ScheduleGroupEdge(final ScheduleGroup src, final ScheduleGroup dst) {
+      super(String.format("ScheduleGroupEdge-%d", nextScheduleGroupEdgeId++), src, dst);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      final ScheduleGroupEdge that = (ScheduleGroupEdge) o;
+      return this.getSrc().equals(that.getSrc()) && this.getDst().equals(that.getDst());
+    }
+
+    @Override
+    public int hashCode() {
+      return getSrc().hashCode() + 31 * getDst().hashCode();
+    }
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
deleted file mode 100644
index 7638e5d7..00000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Default method of partitioning an IR DAG into stages.
- * We traverse the DAG topologically to observe each vertex if it can be added to a stage or if it should be assigned
- * to a new stage. We filter out the candidate incoming edges to connect to an existing stage, and if it exists, we
- * connect it to the stage, and otherwise we don't.
- */
-public final class DefaultStagePartitioningPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public DefaultStagePartitioningPass() {
-    super(StageIdProperty.class, Stream.of(
-        DataCommunicationPatternProperty.class,
-        ExecutorPlacementProperty.class,
-        DataFlowModelProperty.class,
-        PartitionerProperty.class,
-        ParallelismProperty.class
-    ).collect(Collectors.toSet()));
-  }
-
-  @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG) {
-    final AtomicInteger stageNum = new AtomicInteger(0);
-    final List<List<IRVertex>> vertexListForEachStage = groupVerticesByStage(irDAG);
-    vertexListForEachStage.forEach(stageVertices -> {
-      stageVertices.forEach(irVertex -> irVertex.setProperty(StageIdProperty.of(stageNum.get())));
-      stageNum.getAndIncrement();
-    });
-    return irDAG;
-  }
-
-  /**
-   * This method traverses the IR DAG to group each of the vertices by stages.
-   * @param irDAG to traverse.
-   * @return List of groups of vertices that are each divided by stages.
-   */
-  private List<List<IRVertex>> groupVerticesByStage(final DAG<IRVertex, IREdge> irDAG) {
-    // Data structures used for stage partitioning.
-    final HashMap<IRVertex, Integer> vertexStageNumHashMap = new HashMap<>();
-    final List<List<IRVertex>> vertexListForEachStage = new ArrayList<>();
-    final AtomicInteger stageNumber = new AtomicInteger(1);
-
-    // First, traverse the DAG topologically to add each vertices to a list associated with each of the stage number.
-    irDAG.topologicalDo(vertex -> {
-      final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(vertex);
-      final Optional<List<IREdge>> inEdgeList = (inEdges == null || inEdges.isEmpty())
-              ? Optional.empty() : Optional.of(inEdges);
-
-      if (!inEdgeList.isPresent()) { // If Source vertex
-        createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage);
-      } else {
-        // Filter candidate incoming edges that can be included in a stage with the vertex.
-        final Optional<List<IREdge>> inEdgesForStage = inEdgeList.map(e -> e.stream()
-            // One to one edges
-            .filter(edge -> edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
-                              .equals(DataCommunicationPatternProperty.Value.OneToOne))
-            // MemoryStore placement
-            .filter(edge -> edge.getPropertyValue(DataStoreProperty.class).get()
-                              .equals(DataStoreProperty.Value.MemoryStore))
-            // if src and dst are placed on same container types
-            .filter(edge -> edge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get()
-                .equals(edge.getDst().getPropertyValue(ExecutorPlacementProperty.class).get()))
-            // if src and dst have same parallelism
-            .filter(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get()
-                .equals(edge.getDst().getPropertyValue(ParallelismProperty.class).get()))
-            // Src that is already included in a stage
-            .filter(edge -> vertexStageNumHashMap.containsKey(edge.getSrc()))
-            .collect(Collectors.toList()));
-        // Choose one to connect out of the candidates. We want to connect the vertex to a single stage.
-        final Optional<IREdge> edgeToConnect = inEdgesForStage.map(edges -> edges.stream().findAny())
-            .orElse(Optional.empty());
-
-        if (!inEdgesForStage.isPresent() || inEdgesForStage.get().isEmpty() || !edgeToConnect.isPresent()) {
-          // when we cannot connect vertex in other stages
-          createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage);
-        } else {
-          // otherwise connect with a stage.
-          final IRVertex irVertexToConnect = edgeToConnect.get().getSrc();
-          vertexStageNumHashMap.put(vertex, vertexStageNumHashMap.get(irVertexToConnect));
-          final Optional<List<IRVertex>> listOfIRVerticesOfTheStage =
-              vertexListForEachStage.stream().filter(l -> l.contains(irVertexToConnect)).findFirst();
-          listOfIRVerticesOfTheStage.ifPresent(lst -> {
-            vertexListForEachStage.remove(lst);
-            lst.add(vertex);
-            vertexListForEachStage.add(lst);
-          });
-        }
-      }
-    });
-    return vertexListForEachStage;
-  }
-
-  /**
-   * Creates a new stage.
-   * @param irVertex the vertex which begins the stage.
-   * @param vertexStageNumHashMap to keep track of vertex and its stage number.
-   * @param stageNumber to atomically number stages.
-   * @param vertexListForEachStage to group each vertex lists for each stages.
-   */
-  private static void createNewStage(final IRVertex irVertex, final HashMap<IRVertex, Integer> vertexStageNumHashMap,
-                                     final AtomicInteger stageNumber,
-                                     final List<List<IRVertex>> vertexListForEachStage) {
-    vertexStageNumHashMap.put(irVertex, stageNumber.getAndIncrement());
-    final List<IRVertex> newList = new ArrayList<>();
-    newList.add(irVertex);
-    vertexListForEachStage.add(newList);
-  }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index 719de834..76983fd0 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -16,9 +16,9 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 import java.util.Collections;
 import java.util.List;
@@ -32,7 +32,7 @@
    * Default constructor.
    */
   public DisaggregationEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(DataStoreProperty.class));
+    super(InterTaskDataStoreProperty.class, Collections.singleton(InterTaskDataStoreProperty.class));
   }
 
   @Override
@@ -40,10 +40,7 @@ public DisaggregationEdgeDataStorePass() {
     dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with GlusterFileStore.
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       inEdges.forEach(edge -> {
-        if (DataStoreProperty.Value.LocalFileStore
-              .equals(edge.getPropertyValue(DataStoreProperty.class).get())) {
-          edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
-        }
+        edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.GlusterFileStore));
       });
     });
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
index 0777c1c9..8e4023cb 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
@@ -17,9 +17,9 @@
 
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 
 import java.util.Collections;
@@ -33,7 +33,7 @@
    * Default constructor.
    */
   public PadoEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
+    super(InterTaskDataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
   }
 
   @Override
@@ -43,12 +43,12 @@ public PadoEdgeDataStorePass() {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
           } else if (DataCommunicationPatternProperty.Value.OneToOne
               .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
           } else {
-            edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
index 6fae5efb..fe5d9963 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
@@ -18,8 +18,8 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 import java.util.Collections;
 
@@ -32,7 +32,7 @@
    * Default constructor.
    */
   public SailfishEdgeDataStorePass() {
-    super(DataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
+    super(InterTaskDataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -46,12 +46,13 @@ public SailfishEdgeDataStorePass() {
           if (DataCommunicationPatternProperty.Value.Shuffle
           .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             // Pass data through memory to the merger vertex.
-            edgeToMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
+            edgeToMerger.setProperty(InterTaskDataStoreProperty
+                .of(InterTaskDataStoreProperty.Value.SerializedMemoryStore));
           }
         });
         dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
             // Merge the input data and write it immediately to the remote disk.
-            edgeFromMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
+            edgeFromMerger.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore)));
       }
     });
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
deleted file mode 100644
index e76f4ba1..00000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import com.google.common.collect.Lists;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
-
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * A pass for assigning each stages in schedule groups.
- * We traverse the DAG topologically to find the dependency information between stages and number them appropriately
- * to give correct order or schedule groups.
- */
-public final class ScheduleGroupPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public ScheduleGroupPass() {
-    super(ScheduleGroupIndexProperty.class, Stream.of(
-        StageIdProperty.class,
-        DataCommunicationPatternProperty.class,
-        ExecutorPlacementProperty.class,
-        DataFlowModelProperty.class,
-        PartitionerProperty.class,
-        ParallelismProperty.class
-    ).collect(Collectors.toSet()));
-  }
-
-  private static final int INITIAL_SCHEDULE_GROUP = 0;
-
-  @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    // We assume that the input dag is tagged with stage ids.
-    if (dag.getVertices().stream()
-        .anyMatch(irVertex -> !irVertex.getPropertyValue(StageIdProperty.class).isPresent())) {
-      throw new RuntimeException("There exists an IR vertex going through ScheduleGroupPass "
-          + "without stage id tagged.");
-    }
-
-    // Map of stage id to the stage ids that it depends on.
-    final Map<Integer, Set<Integer>> dependentStagesMap = new HashMap<>();
-    dag.topologicalDo(irVertex -> {
-      final Integer currentStageId = irVertex.getPropertyValue(StageIdProperty.class).get();
-      dependentStagesMap.putIfAbsent(currentStageId, new HashSet<>());
-      // while traversing, we find the stages that point to the current stage and add them to the list.
-      dag.getIncomingEdgesOf(irVertex).stream()
-          .map(IREdge::getSrc)
-          .mapToInt(vertex -> vertex.getPropertyValue(StageIdProperty.class).get())
-          .filter(n -> n != currentStageId)
-          .forEach(n -> dependentStagesMap.get(currentStageId).add(n));
-    });
-
-    // Map to put our results in.
-    final Map<Integer, Integer> stageIdToScheduleGroupIndexMap = new HashMap<>();
-
-    // Calculate schedule group number of each stages step by step
-    while (stageIdToScheduleGroupIndexMap.size() < dependentStagesMap.size()) {
-      // This is to ensure that each iteration is making progress.
-      // We ensure that the stageIdToScheduleGroupIdMap is increasing in size in each iteration.
-      final Integer previousSize = stageIdToScheduleGroupIndexMap.size();
-      dependentStagesMap.forEach((stageId, dependentStages) -> {
-        if (!stageIdToScheduleGroupIndexMap.keySet().contains(stageId)
-            && dependentStages.isEmpty()) { // initial source stages
-          // initial source stages are indexed with schedule group 0.
-          stageIdToScheduleGroupIndexMap.put(stageId, INITIAL_SCHEDULE_GROUP);
-        } else if (!stageIdToScheduleGroupIndexMap.keySet().contains(stageId)
-            && dependentStages.stream().allMatch(stageIdToScheduleGroupIndexMap::containsKey)) { // next stages
-          // We find the maximum schedule group index from previous stages, and index current stage with that number +1.
-          final Integer maxDependentSchedulerGroupIndex =
-              dependentStages.stream()
-                  .mapToInt(stageIdToScheduleGroupIndexMap::get)
-                  .max().orElseThrow(() ->
-                    new RuntimeException("A stage that is not a source stage much have dependent stages"));
-          stageIdToScheduleGroupIndexMap.put(stageId, maxDependentSchedulerGroupIndex + 1);
-        }
-      });
-      if (previousSize == stageIdToScheduleGroupIndexMap.size()) {
-        throw new RuntimeException("Iteration for indexing schedule groups in "
-            + ScheduleGroupPass.class.getSimpleName() + " is not making progress");
-      }
-    }
-
-    // Reverse topologically traverse and match schedule group ids for those that have push edges in between
-    Lists.reverse(dag.getTopologicalSort()).forEach(v -> {
-      // get the destination vertices of the edges that are marked as push
-      final List<IRVertex> pushConnectedVertices = dag.getOutgoingEdgesOf(v).stream()
-          .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getPropertyValue(DataFlowModelProperty.class).get()))
-          .map(IREdge::getDst)
-          .collect(Collectors.toList());
-      if (!pushConnectedVertices.isEmpty()) { // if we need to do something,
-        // we find the min value of the destination schedule groups.
-        final Integer newSchedulerGroupIndex = pushConnectedVertices.stream()
-            .mapToInt(irVertex -> stageIdToScheduleGroupIndexMap
-                .get(irVertex.getPropertyValue(StageIdProperty.class).get()))
-            .min().orElseThrow(() -> new RuntimeException("a list was not empty, but produced an empty result"));
-        // overwrite
-        final Integer originalScheduleGroupIndex = stageIdToScheduleGroupIndexMap
-            .get(v.getPropertyValue(StageIdProperty.class).get());
-        stageIdToScheduleGroupIndexMap.replace(v.getPropertyValue(StageIdProperty.class).get(), newSchedulerGroupIndex);
-        // shift those if it came too far
-        if (stageIdToScheduleGroupIndexMap.values().stream()
-            .noneMatch(stageIndex -> stageIndex.equals(originalScheduleGroupIndex))) { // if it doesn't exist
-          stageIdToScheduleGroupIndexMap.replaceAll((stageId, scheduleGroupIndex) -> {
-            if (scheduleGroupIndex > originalScheduleGroupIndex) {
-              return scheduleGroupIndex - 1; // we shift schedule group indexes by one.
-            } else {
-              return scheduleGroupIndex;
-            }
-          });
-        }
-      }
-    });
-
-    // do the tagging
-    dag.topologicalDo(irVertex -> irVertex.setProperty(ScheduleGroupIndexProperty.of(
-        stageIdToScheduleGroupIndexMap.get(irVertex.getPropertyValue(StageIdProperty.class).get()))));
-
-    return dag;
-  }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index b0ab2b17..3a5c80c9 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -30,13 +30,12 @@
    */
   public PrimitiveCompositePass() {
     super(Arrays.asList(
-        new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
+        new DefaultParallelismPass(),
         new DefaultEdgeEncoderPass(),
         new DefaultEdgeDecoderPass(),
-        new DefaultStagePartitioningPass(),
-        new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
+        new DefaultInterTaskDataStorePass(),
         new DefaultEdgeUsedDataHandlingPass(),
-        new ScheduleGroupPass(),
+        new DefaultScheduleGroupPass(),
         new CompressionPass(),
         new DecompressionPass()
     ));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 9bf434a7..1abe0927 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -23,6 +23,7 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SkipSerDesProperty;
 import edu.snu.nemo.common.ir.vertex.transform.RelayTransform;
 
 import java.util.Collections;
@@ -58,6 +59,7 @@ public SailfishRelayReshapingPass() {
             // Insert a merger vertex having transform that write received data immediately
             // before the vertex receiving shuffled data.
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
+            iFileMergerVertex.getExecutionProperties().put(SkipSerDesProperty.of());
             builder.addVertex(iFileMergerVertex);
             final IREdge newEdgeToMerger = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
                 edge.getSrc(), iFileMergerVertex, edge.isSideInput());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 124aa166..7fc4fba5 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -16,8 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
 
 import java.util.ArrayList;
@@ -30,8 +29,7 @@
   @Override
   public List<CompileTimePass> getCompileTimePasses() {
     List<CompileTimePass> policy = new ArrayList<>();
-    policy.add(new DefaultStagePartitioningPass());
-    policy.add(new ScheduleGroupPass());
+    policy.add(new DefaultScheduleGroupPass());
     return policy;
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 595079eb..53ff914d 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -16,8 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
 
@@ -31,9 +30,8 @@
   @Override
   public List<CompileTimePass> getCompileTimePasses() {
     List<CompileTimePass> policy = new ArrayList<>();
-    policy.add(new DefaultStagePartitioningPass());
     policy.add(new ShuffleEdgePushPass());
-    policy.add(new ScheduleGroupPass());
+    policy.add(new DefaultScheduleGroupPass());
     return policy;
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index 62193a69..d23d5bda 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -16,9 +16,8 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ReviseInterStageEdgeDataStorePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultInterTaskDataStorePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
@@ -63,9 +62,8 @@ public DefaultPolicyWithSeparatePass() {
      */
     RefactoredPass() {
       super(Arrays.asList(
-          new DefaultStagePartitioningPass(),
-          new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
-          new ScheduleGroupPass()
+          new DefaultInterTaskDataStorePass(),
+          new DefaultScheduleGroupPass()
       ));
     }
   }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index 7b40556c..5268c007 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
@@ -67,7 +67,7 @@ public PolicyBuilder(final Boolean strictPrerequisiteCheckMode) {
     annotatedExecutionProperties.add(ExecutorPlacementProperty.class);
     annotatedExecutionProperties.add(ParallelismProperty.class);
     annotatedExecutionProperties.add(DataFlowModelProperty.class);
-    annotatedExecutionProperties.add(DataStoreProperty.class);
+    annotatedExecutionProperties.add(InterTaskDataStoreProperty.class);
     annotatedExecutionProperties.add(PartitionerProperty.class);
   }
 
diff --git a/examples/resources/beam_sample_executor_resources.json b/examples/resources/beam_sample_executor_resources.json
index ced110c3..0b40af97 100644
--- a/examples/resources/beam_sample_executor_resources.json
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -2,11 +2,11 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 5
+    "capacity": 15
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 5
+    "capacity": 15
   }
 ]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 86939183..d8999b4d 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -89,7 +89,7 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan,
       optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
     });
 
-    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getIdToIRVertex());
+    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index fdd7edf3..30f71110 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -19,6 +19,7 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -34,14 +35,18 @@
    *
    * @param id              ID of the plan.
    * @param stageDAG        the DAG of stages.
-   * @param idToIRVertex map from task to IR vertex.
    */
   public PhysicalPlan(final String id,
-                      final DAG<Stage, StageEdge> stageDAG,
-                      final Map<String, IRVertex> idToIRVertex) {
+                      final DAG<Stage, StageEdge> stageDAG) {
     this.id = id;
     this.stageDAG = stageDAG;
-    this.idToIRVertex = idToIRVertex;
+
+    idToIRVertex = new HashMap<>();
+    for (final Stage stage : stageDAG.getVertices()) {
+      for (final IRVertex irVertex : stage.getIRDAG().getVertices()) {
+        idToIRVertex.put(irVertex.getId(), irVertex);
+      }
+    }
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index a11997e3..d6793876 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -18,17 +18,19 @@
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.*;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
@@ -39,18 +41,21 @@
  * A function that converts an IR DAG to physical DAG.
  */
 public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
-  private final Map<String, IRVertex> idToIRVertex;
   private final String dagDirectory;
+  private final StagePartitioner stagePartitioner;
 
   /**
    * Private constructor.
    *
+   * @param stagePartitioner provides stage partitioning
    * @param dagDirectory the directory in which to store DAG data.
    */
   @Inject
-  private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
-    this.idToIRVertex = new HashMap<>();
+  private PhysicalPlanGenerator(final StagePartitioner stagePartitioner,
+                                @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
     this.dagDirectory = dagDirectory;
+    this.stagePartitioner = stagePartitioner;
+    stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
   }
 
   /**
@@ -64,6 +69,9 @@ private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final Strin
     // first, stage-partition the IR DAG.
     final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG);
 
+    // Sanity check
+    dagOfStages.getVertices().forEach(this::integrityCheck);
+
     // this is needed because of DuplicateEdgeGroupProperty.
     handleDuplicateEdgeGroupProperty(dagOfStages);
 
@@ -73,10 +81,6 @@ private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final Strin
     return dagOfStages;
   }
 
-  public Map<String, IRVertex> getIdToIRVertex() {
-    return idToIRVertex;
-  }
-
   /**
    * Convert the edge id of DuplicateEdgeGroupProperty to physical edge id.
    *
@@ -115,41 +119,36 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
    */
   public DAG<Stage, StageEdge> stagePartitionIrDAG(final DAG<IRVertex, IREdge> irDAG) {
     final DAGBuilder<Stage, StageEdge> dagOfStagesBuilder = new DAGBuilder<>();
+    final Set<IREdge> interStageEdges = new HashSet<>();
+    final Map<Integer, Stage> stageIdToStageMap = new HashMap<>();
+    final Map<IRVertex, Integer> vertexToStageIdMap = stagePartitioner.apply(irDAG);
 
-    final Map<Integer, List<IRVertex>> vertexListForEachStage = new LinkedHashMap<>();
+    final Map<Integer, Set<IRVertex>> vertexSetForEachStage = new LinkedHashMap<>();
     irDAG.topologicalDo(irVertex -> {
-      final Integer stageNum = irVertex.getPropertyValue(StageIdProperty.class).get();
-      if (!vertexListForEachStage.containsKey(stageNum)) {
-        vertexListForEachStage.put(stageNum, new ArrayList<>());
+      final int stageId = vertexToStageIdMap.get(irVertex);
+      if (!vertexSetForEachStage.containsKey(stageId)) {
+        vertexSetForEachStage.put(stageId, new HashSet<>());
       }
-      vertexListForEachStage.get(stageNum).add(irVertex);
+      vertexSetForEachStage.get(stageId).add(irVertex);
     });
 
-    final Map<IRVertex, Stage> vertexStageMap = new HashMap<>();
-
-    for (final List<IRVertex> stageVertices : vertexListForEachStage.values()) {
-      integrityCheck(stageVertices);
-
-      final Set<IRVertex> currentStageVertices = new HashSet<>();
-      final Set<StageEdgeBuilder> currentStageIncomingEdges = new HashSet<>();
+    for (final int stageId : vertexSetForEachStage.keySet()) {
+      final Set<IRVertex> stageVertices = vertexSetForEachStage.get(stageId);
+      final String stageIdentifier = RuntimeIdGenerator.generateStageId(stageId);
+      final ExecutionPropertyMap<VertexExecutionProperty> stageProperties = new ExecutionPropertyMap<>(stageIdentifier);
+      stagePartitioner.getStageProperties(stageVertices.iterator().next()).forEach(stageProperties::put);
+      final int stageParallelism = stageProperties.get(ParallelismProperty.class)
+          .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
 
-      // Create a new stage builder.
-      final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
-          .orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
-      final StageBuilder stageBuilder = new StageBuilder(
-          irVertexOfNewStage.getPropertyValue(StageIdProperty.class).get(),
-          irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get(),
-          irVertexOfNewStage.getPropertyValue(ScheduleGroupIndexProperty.class).get(),
-          irVertexOfNewStage.getPropertyValue(ExecutorPlacementProperty.class).get());
+      final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
 
-      // Prepare useful variables.
-      final int stageParallelism = irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get();
+      // Prepare vertexIdtoReadables
       final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
         vertexIdToReadables.add(new HashMap<>());
       }
 
-      // For each vertex in the stage,
+      // For each IRVertex,
       for (final IRVertex irVertex : stageVertices) {
         // Take care of the readables of a source vertex.
         if (irVertex instanceof SourceVertex) {
@@ -159,108 +158,78 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
             for (int i = 0; i < stageParallelism; i++) {
               vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
             }
-          } catch (Exception e) {
+          } catch (final Exception e) {
             throw new PhysicalPlanGenerationException(e);
           }
-
           // Clear internal metadata.
           sourceVertex.clearInternalStates();
         }
 
         // Add vertex to the stage.
-        stageBuilder.addVertex(irVertex);
-        currentStageVertices.add(irVertex);
+        stageInternalDAGBuilder.addVertex(irVertex);
+      }
 
+      for (final IRVertex dstVertex : stageVertices) {
         // Connect all the incoming edges for the vertex.
-        final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(irVertex);
-        final Optional<List<IREdge>> inEdgeList = (inEdges == null) ? Optional.empty() : Optional.of(inEdges);
-        inEdgeList.ifPresent(edges -> edges.forEach(irEdge -> {
+        irDAG.getIncomingEdgesOf(dstVertex).forEach(irEdge -> {
           final IRVertex srcVertex = irEdge.getSrc();
-          final IRVertex dstVertex = irEdge.getDst();
 
-          if (srcVertex == null) {
-            throw new IllegalVertexOperationException("Unable to locate srcVertex for IREdge " + irEdge);
-          } else if (dstVertex == null) {
-            throw new IllegalVertexOperationException("Unable to locate dstVertex for IREdge " + irEdge);
-          }
-
-          // both vertices are in the stage.
-          if (currentStageVertices.contains(srcVertex) && currentStageVertices.contains(dstVertex)) {
-            stageBuilder.connectInternalVertices(new RuntimeEdge<IRVertex>(
+          // both vertices are in the same stage.
+          if (vertexToStageIdMap.get(srcVertex).equals(vertexToStageIdMap.get(dstVertex))) {
+            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
                 irEdge.getId(),
                 irEdge.getExecutionProperties(),
                 irEdge.getSrc(),
                 irEdge.getDst(),
                 irEdge.isSideInput()));
           } else { // edge comes from another stage
-            final Stage srcStage = vertexStageMap.get(srcVertex);
-
-            if (srcStage == null) {
-              throw new IllegalVertexOperationException("srcVertex " + srcVertex.getId()
-                  + " not yet added to the builder");
-            }
-
-            final StageEdgeBuilder newEdgeBuilder = new StageEdgeBuilder(irEdge.getId())
-                .setEdgeProperties(irEdge.getExecutionProperties())
-                .setSrcVertex(srcVertex)
-                .setDstVertex(dstVertex)
-                .setSrcStage(srcStage)
-                .setSideInputFlag(irEdge.isSideInput());
-            currentStageIncomingEdges.add(newEdgeBuilder);
+            interStageEdges.add(irEdge);
           }
-        }));
-
-        // Track id to irVertex.
-        idToIRVertex.put(irVertex.getId(), irVertex);
+        });
       }
-      stageBuilder.addReadables(vertexIdToReadables);
       // If this runtime stage contains at least one vertex, build it!
-      if (!stageBuilder.isEmpty()) {
-        final Stage currentStage = stageBuilder.build();
-        dagOfStagesBuilder.addVertex(currentStage);
-
-        // Add this stage as the destination stage for all the incoming edges.
-        currentStageIncomingEdges.forEach(stageEdgeBuilder -> {
-          stageEdgeBuilder.setDstStage(currentStage);
-          final StageEdge stageEdge = stageEdgeBuilder.build();
-          dagOfStagesBuilder.connectVertices(stageEdge);
-        });
-        currentStageIncomingEdges.clear();
+      if (!stageInternalDAGBuilder.isEmpty()) {
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAG
+            = stageInternalDAGBuilder.buildWithoutSourceSinkCheck();
+        final Stage stage = new Stage(stageIdentifier, stageInternalDAG, stageProperties, vertexIdToReadables);
+        dagOfStagesBuilder.addVertex(stage);
+        stageIdToStageMap.put(stageId, stage);
+      }
+    }
 
-        currentStageVertices.forEach(irVertex -> vertexStageMap.put(irVertex, currentStage));
-        currentStageVertices.clear();
+    // Add StageEdges
+    for (final IREdge interStageEdge : interStageEdges) {
+      final Stage srcStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getSrc()));
+      final Stage dstStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getDst()));
+      if (srcStage == null || dstStage == null) {
+        throw new IllegalVertexOperationException(String.format("Stage not added to the builder:%s%s",
+            srcStage == null ? String.format(" source stage for %s", interStageEdge.getSrc()) : "",
+            dstStage == null ? String.format(" destination stage for %s", interStageEdge.getDst()) : ""));
       }
+      dagOfStagesBuilder.connectVertices(new StageEdge(interStageEdge.getId(), interStageEdge.getExecutionProperties(),
+          interStageEdge.getSrc(), interStageEdge.getDst(), srcStage, dstStage, interStageEdge.isSideInput()));
     }
 
     return dagOfStagesBuilder.build();
   }
 
   /**
-   * Integrity check for a stage's vertices.
-   * @param stageVertices to check for
+   * Integrity check for Stage.
+   * @param stage to check for
    */
-  private void integrityCheck(final List<IRVertex> stageVertices) {
-    final IRVertex firstVertex = stageVertices.get(0);
-    final String placement = firstVertex.getPropertyValue(ExecutorPlacementProperty.class).get();
-    final int scheduleGroup = firstVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
-    final int parallelism = firstVertex.getPropertyValue(ParallelismProperty.class).get();
+  private void integrityCheck(final Stage stage) {
+    stage.getPropertyValue(ParallelismProperty.class)
+        .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
+    stage.getPropertyValue(ScheduleGroupIndexProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
 
-    stageVertices.forEach(irVertex -> {
+    stage.getIRDAG().getVertices().forEach(irVertex -> {
       // Check vertex type.
       if (!(irVertex instanceof  SourceVertex
           || irVertex instanceof OperatorVertex
           || irVertex instanceof MetricCollectionBarrierVertex)) {
         throw new UnsupportedOperationException(irVertex.toString());
       }
-
-      // Check execution properties.
-      if ((placement != null
-          && !placement.equals(irVertex.getPropertyValue(ExecutorPlacementProperty.class).get()))
-          || scheduleGroup != irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get()
-          || parallelism != irVertex.getPropertyValue(ParallelismProperty.class).get()) {
-        throw new RuntimeException("Vertices of the same stage have different execution properties: "
-            + irVertex.getId());
-      }
     });
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index 1280f5b0..ee0a337e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -18,48 +18,52 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.commons.lang3.SerializationUtils;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Stage.
  */
 public final class Stage extends Vertex {
   private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
-  private final int parallelism;
-  private final int scheduleGroupIndex;
-  private final String containerType;
   private final byte[] serializedIRDag;
+  private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
   private final List<Map<String, Readable>> vertexIdToReadables;
+  private final int parallelism;
+  private final int scheduleGroupIndex;
 
   /**
    * Constructor.
    *
    * @param stageId             ID of the stage.
    * @param irDag               the DAG of the task in this stage.
-   * @param parallelism         how many tasks will be executed in this stage.
-   * @param scheduleGroupIndex  the schedule group index.
-   * @param containerType       the type of container to execute the task on.
+   * @param executionProperties set of {@link VertexExecutionProperty} for this stage
    * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
    */
   public Stage(final String stageId,
                final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
-               final int parallelism,
-               final int scheduleGroupIndex,
-               final String containerType,
+               final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
                final List<Map<String, Readable>> vertexIdToReadables) {
     super(stageId);
     this.irDag = irDag;
-    this.parallelism = parallelism;
-    this.scheduleGroupIndex = scheduleGroupIndex;
-    this.containerType = containerType;
     this.serializedIRDag = SerializationUtils.serialize(irDag);
+    this.executionProperties = executionProperties;
     this.vertexIdToReadables = vertexIdToReadables;
+    this.parallelism = executionProperties.get(ParallelismProperty.class)
+        .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
+    this.scheduleGroupIndex = executionProperties.get(ScheduleGroupIndexProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
   }
 
   /**
@@ -95,10 +99,22 @@ public int getScheduleGroupIndex() {
   }
 
   /**
-   * @return the type of container to execute the task on.
+   * @return {@link VertexExecutionProperty} map for this stage
+   */
+  public ExecutionPropertyMap<VertexExecutionProperty> getExecutionProperties() {
+    return executionProperties;
+  }
+
+  /**
+   * Get the executionProperty of the IREdge.
+   *
+   * @param <T>                  Type of the return value.
+   * @param executionPropertyKey key of the execution property.
+   * @return the execution property.
    */
-  public String getContainerType() {
-    return containerType;
+  public <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
+    return executionProperties.get(executionPropertyKey);
   }
 
   /**
@@ -114,7 +130,7 @@ public String propertiesToJSON() {
     sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
     sb.append(", \"irDag\": ").append(irDag);
     sb.append(", \"parallelism\": ").append(parallelism);
-    sb.append(", \"containerType\": \"").append(containerType).append("\"");
+    sb.append(", \"executionProperties\": ").append(executionProperties);
     sb.append('}');
     return sb.toString();
   }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
deleted file mode 100644
index 04475d21..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.common.dag.DAGBuilder;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Stage Builder.
- */
-final class StageBuilder {
-  private final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder;
-  private final Integer stageId;
-  private final int parallelism;
-  private final int scheduleGroupIndex;
-  private final String containerType;
-  private List<Map<String, Readable>> vertexIdToReadables;
-
-  /**
-   * Builds a {@link Stage}.
-   * @param stageId stage ID in numeric form.
-   * @param scheduleGroupIndex indicating its scheduling order.
-   */
-  StageBuilder(final Integer stageId,
-               final int parallelism,
-               final int scheduleGroupIndex,
-               final String containerType) {
-    this.stageId = stageId;
-    this.parallelism = parallelism;
-    this.scheduleGroupIndex = scheduleGroupIndex;
-    this.containerType = containerType;
-    this.stageInternalDAGBuilder = new DAGBuilder<>();
-    this.vertexIdToReadables = new ArrayList<>(1);
-  }
-
-  /**
-   * Adds a {@link IRVertex} to this stage.
-   * @param vertex to add.
-   * @return the stageBuilder.
-   */
-  StageBuilder addVertex(final IRVertex vertex) {
-    stageInternalDAGBuilder.addVertex(vertex);
-    return this; }
-
-  /**
-   * Connects two {@link IRVertex} in this stage.
-   * @param edge the IREdge that connects vertices.
-   * @return the stageBuilder.
-   */
-  StageBuilder connectInternalVertices(final RuntimeEdge<IRVertex> edge) {
-    stageInternalDAGBuilder.connectVertices(edge);
-    return this;
-  }
-
-  StageBuilder addReadables(final List<Map<String, Readable>> vertexIdToReadable) {
-    this.vertexIdToReadables = vertexIdToReadable;
-    return this;
-  }
-
-  /**
-   * @return true if this builder doesn't contain any valid {@link IRVertex}.
-   */
-  boolean isEmpty() {
-    return stageInternalDAGBuilder.isEmpty();
-  }
-
-  /**
-   * Builds and returns the {@link Stage}.
-   * @return the runtime stage.
-   */
-  Stage build() {
-    final Stage stage = new Stage(
-        RuntimeIdGenerator.generateStageId(stageId),
-        stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
-        parallelism,
-        scheduleGroupIndex,
-        containerType,
-        vertexIdToReadables);
-    return stage;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 608da4db..d2697f4a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -56,7 +57,8 @@
    * @param dstStage       destination stage.
    * @param isSideInput    whether or not the edge is a sideInput edge.
    */
-  StageEdge(final String runtimeEdgeId,
+  @VisibleForTesting
+  public StageEdge(final String runtimeEdgeId,
             final ExecutionPropertyMap edgeProperties,
             final IRVertex srcVertex,
             final IRVertex dstVertex,
@@ -91,7 +93,7 @@ public IRVertex getDstVertex() {
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"runtimeEdgeId\": \"").append(getId());
-    sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
+    sb.append("\", \"executionProperties\": ").append(getExecutionProperties());
     sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
     sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
     sb.append("\"}");
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
deleted file mode 100644
index f94feca7..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-
-/**
- * Stage Edge Builder.
- */
-public final class StageEdgeBuilder {
-  private final String stageEdgeId;
-  private ExecutionPropertyMap edgeProperties;
-  private Stage srcStage;
-  private Stage dstStage;
-  private IRVertex srcVertex;
-  private IRVertex dstVertex;
-  private Boolean isSideInput;
-
-  /**
-   * Represents the edge between vertices in a logical plan.
-   *
-   * @param irEdgeId id of this edge.
-   */
-  public StageEdgeBuilder(final String irEdgeId) {
-    this.stageEdgeId = irEdgeId;
-  }
-
-  /**
-   * Setter for edge properties.
-   *
-   * @param ea the edge properties.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setEdgeProperties(final ExecutionPropertyMap ea) {
-    this.edgeProperties = ea;
-    return this;
-  }
-
-  /**
-   * Setter for the source stage.
-   *
-   * @param ss the source stage.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setSrcStage(final Stage ss) {
-    this.srcStage = ss;
-    return this;
-  }
-
-  /**
-   * Setter for the destination stage.
-   *
-   * @param ds the destination stage.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setDstStage(final Stage ds) {
-    this.dstStage = ds;
-    return this;
-  }
-
-  /**
-   * Setter for the source vertex.
-   *
-   * @param sv the source vertex.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setSrcVertex(final IRVertex sv) {
-    this.srcVertex = sv;
-    return this;
-  }
-
-  /**
-   * Setter for the destination vertex.
-   *
-   * @param dv the destination vertex.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setDstVertex(final IRVertex dv) {
-    this.dstVertex = dv;
-    return this;
-  }
-
-  /**
-   * Setter for side input flag.
-   *
-   * @param sideInputFlag the side input flag.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setSideInputFlag(final Boolean sideInputFlag) {
-    this.isSideInput = sideInputFlag;
-    return this;
-  }
-
-  /**
-   * @return the built stage edge.
-   */
-  public StageEdge build() {
-    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, isSideInput);
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
new file mode 100644
index 00000000..49553dca
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A function that is responsible for stage partitioning on IR DAG.
+ * Each stage becomes maximal set of {@link IRVertex} such that
+ * <ul>
+ *   <li>branches and non-OneToOne edges are not allowed within a stage, and</li>
+ *   <li>all vertices in a stage should have same {@link VertexExecutionProperty} map,
+ *   except for the ignored properties.</li>
+ * </ul>
+ */
+@DriverSide
+@ThreadSafe
+public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, Map<IRVertex, Integer>> {
+  private final Set<Class<? extends VertexExecutionProperty>> ignoredPropertyKeys = ConcurrentHashMap.newKeySet();
+
+  @Inject
+  private StagePartitioner() {
+  }
+
+  /**
+   * By default, the stage partitioner merges two vertices into one stage if and only if the two vertices have
+   * same set of {@link VertexExecutionProperty}.
+   * Invoking this method will make the stage partitioner ignore a specific property during comparing
+   * the execution property maps.
+   * @param ignoredPropertyKey a property that will be ignored during the stage partitioning.
+   */
+  public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty> ignoredPropertyKey) {
+    ignoredPropertyKeys.add(ignoredPropertyKey);
+  }
+
+  /**
+   * @param irDAG IR DAG to perform stage partition on.
+   * @return a map between IR vertex and the corresponding stage id
+   */
+  @Override
+  public Map<IRVertex, Integer> apply(final DAG<IRVertex, IREdge> irDAG) {
+    final MutableInt nextStageIndex = new MutableInt(0);
+    final Map<IRVertex, Integer> vertexToStageIdMap = new HashMap<>();
+    irDAG.topologicalDo(irVertex -> {
+      // Base case: for root vertices
+      if (vertexToStageIdMap.get(irVertex) == null) {
+        vertexToStageIdMap.put(irVertex, nextStageIndex.getValue());
+        nextStageIndex.increment();
+      }
+      // Get stage id of irVertex
+      final int stageId = vertexToStageIdMap.get(irVertex);
+      // Step case: inductively assign stage ids based on mergability with irVertex
+      for (final IREdge edge : irDAG.getOutgoingEdgesOf(irVertex)) {
+        final IRVertex connectedIRVertex = edge.getDst();
+        // Skip if it already has been assigned stageId
+        if (vertexToStageIdMap.containsKey(connectedIRVertex)) {
+          continue;
+        }
+        // Assign stageId
+        if (testMergability(edge, irDAG)) {
+          vertexToStageIdMap.put(connectedIRVertex, stageId);
+        } else {
+          vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue());
+          nextStageIndex.increment();
+        }
+      }
+    });
+    return vertexToStageIdMap;
+  }
+
+  /**
+   * @param edge an {@link IREdge}.
+   * @param dag IR DAG which contains {@code edge}
+   * @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage.
+   */
+  private boolean testMergability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
+    // If the destination vertex has multiple inEdges, return false
+    if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) {
+      return false;
+    }
+    // If the edge is not OneToOne, return false
+    if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
+        != DataCommunicationPatternProperty.Value.OneToOne) {
+      return false;
+    }
+    // Return true if and only if the execution properties of the two vertices are compatible
+    return getStageProperties(edge.getSrc()).equals(getStageProperties(edge.getDst()));
+  }
+
+  /**
+   * @param vertex a vertex in a stage
+   * @return set of stage-level properties for the stage
+   */
+  public Set<VertexExecutionProperty> getStageProperties(final IRVertex vertex) {
+    final Stream<VertexExecutionProperty> stream = vertex.getExecutionProperties().stream();
+    return stream.filter(p -> !ignoredPropertyKeys.contains(p.getClass())).collect(Collectors.toSet());
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 7f2a203e..21ea23a0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -16,10 +16,13 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A Task is a self-contained executable that can be executed on a machine.
@@ -30,7 +33,7 @@
   private final List<StageEdge> taskIncomingEdges;
   private final List<StageEdge> taskOutgoingEdges;
   private final int attemptIdx;
-  private final String containerType;
+  private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
   private final byte[] serializedIRDag;
   private final Map<String, Readable> irVertexIdToReadable;
 
@@ -40,7 +43,7 @@
    * @param jobId                the id of the job.
    * @param taskId               the ID of the task.
    * @param attemptIdx           the attempt index.
-   * @param containerType        the type of container to execute the task on.
+   * @param executionProperties  {@link VertexExecutionProperty} map for the corresponding stage
    * @param serializedIRDag      the serialized DAG of the task.
    * @param taskIncomingEdges    the incoming edges of the task.
    * @param taskOutgoingEdges    the outgoing edges of the task.
@@ -49,7 +52,7 @@
   public Task(final String jobId,
               final String taskId,
               final int attemptIdx,
-              final String containerType,
+              final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
               final byte[] serializedIRDag,
               final List<StageEdge> taskIncomingEdges,
               final List<StageEdge> taskOutgoingEdges,
@@ -57,7 +60,7 @@ public Task(final String jobId,
     this.jobId = jobId;
     this.taskId = taskId;
     this.attemptIdx = attemptIdx;
-    this.containerType = containerType;
+    this.executionProperties = executionProperties;
     this.serializedIRDag = serializedIRDag;
     this.taskIncomingEdges = taskIncomingEdges;
     this.taskOutgoingEdges = taskOutgoingEdges;
@@ -107,10 +110,22 @@ public int getAttemptIdx() {
   }
 
   /**
-   * @return the type of container to execute the task on.
+   * @return {@link VertexExecutionProperty} map for the corresponding stage
    */
-  public String getContainerType() {
-    return containerType;
+  public ExecutionPropertyMap<VertexExecutionProperty> getExecutionProperties() {
+    return executionProperties;
+  }
+
+  /**
+   * Get the executionProperty of this task.
+   *
+   * @param <T>                  Type of the return value.
+   * @param executionPropertyKey key of the execution property.
+   * @return the execution property.
+   */
+  public <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
+    return executionProperties.get(executionPropertyKey);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 86a18cbb..52c55fdd 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.common.exception.UnsupportedBlockStoreException;
 import edu.snu.nemo.common.exception.UnsupportedExecutionPropertyException;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -122,7 +122,7 @@ private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String exe
    * @throws BlockWriteException for any error occurred while trying to create a block.
    */
   public Block createBlock(final String blockId,
-                           final DataStoreProperty.Value blockStore) throws BlockWriteException {
+                           final InterTaskDataStoreProperty.Value blockStore) throws BlockWriteException {
     final BlockStore store = getBlockStore(blockStore);
     return store.createBlock(blockId);
   }
@@ -137,7 +137,7 @@ public Block createBlock(final String blockId,
    */
   private CompletableFuture<DataUtil.IteratorWithNumBytes> retrieveDataFromBlock(
       final String blockId,
-      final DataStoreProperty.Value blockStore,
+      final InterTaskDataStoreProperty.Value blockStore,
       final KeyRange keyRange) {
     LOG.info("RetrieveDataFromBlock: {}", blockId);
     final BlockStore store = getBlockStore(blockStore);
@@ -188,7 +188,7 @@ public Block createBlock(final String blockId,
   public CompletableFuture<DataUtil.IteratorWithNumBytes> queryBlock(
       final String blockId,
       final String runtimeEdgeId,
-      final DataStoreProperty.Value blockStore,
+      final InterTaskDataStoreProperty.Value blockStore,
       final KeyRange keyRange) {
     // Let's see if a remote worker has it
     final CompletableFuture<ControlMessage.Message> blockLocationFuture =
@@ -261,7 +261,7 @@ public Block createBlock(final String blockId,
    * @param usedDataHandling     how to handle the used block.
    */
   public void writeBlock(final Block block,
-                         final DataStoreProperty.Value blockStore,
+                         final InterTaskDataStoreProperty.Value blockStore,
                          final boolean reportPartitionSizes,
                          final Map<Integer, Long> partitionSizeMap,
                          final String srcIRVertexId,
@@ -289,7 +289,7 @@ public void writeBlock(final Block block,
             .setBlockId(blockId)
             .setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
 
-    if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+    if (InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
       blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
     } else {
       blockStateChangedMsgBuilder.setLocation(executorId);
@@ -335,7 +335,7 @@ public void writeBlock(final Block block,
    * @param blockStore the store which contains the block.
    */
   public void removeBlock(final String blockId,
-                          final DataStoreProperty.Value blockStore) {
+                          final InterTaskDataStoreProperty.Value blockStore) {
     LOG.info("RemoveBlock: {}", blockId);
     final BlockStore store = getBlockStore(blockStore);
     final boolean deleted = store.deleteBlock(blockId);
@@ -347,7 +347,7 @@ public void removeBlock(final String blockId,
               .setBlockId(blockId)
               .setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
 
-      if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+      if (InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
         blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
       } else {
         blockStateChangedMsgBuilder.setLocation(executorId);
@@ -371,7 +371,7 @@ public void removeBlock(final String blockId,
    * @param blockStore the store which contains the block.
    * @param blockId    the ID of the block.
    */
-  private void handleUsedData(final DataStoreProperty.Value blockStore,
+  private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
                               final String blockId) {
     final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
     if (remainingExpectedRead != null) {
@@ -389,11 +389,11 @@ public void run() {
   }
 
   /**
-   * Gets the {@link BlockStore} from annotated value of {@link DataStoreProperty}.
-   * @param blockStore the annotated value of {@link DataStoreProperty}.
+   * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
+   * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
    * @return the block store.
    */
-  private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) {
+  private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
     switch (blockStore) {
       case MemoryStore:
         return memoryStore;
@@ -420,7 +420,7 @@ private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) {
   public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
     final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.PARSER
         .parseFrom(outputContext.getContextDescriptor());
-    final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
+    final InterTaskDataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
     final String blockId = descriptor.getBlockId();
     final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
 
@@ -430,8 +430,8 @@ public void run() {
         try {
           final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
           if (optionalBlock.isPresent()) {
-            if (DataStoreProperty.Value.LocalFileStore.equals(blockStore)
-                || DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+            if (InterTaskDataStoreProperty.Value.LocalFileStore.equals(blockStore)
+                || InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
               final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange);
               for (final FileArea fileArea : fileAreas) {
                 try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
@@ -475,10 +475,10 @@ public void onInputContext(final ByteInputContext inputContext) {
   /**
    * Decodes BlockStore property from protocol buffer.
    * @param blockStore property from protocol buffer
-   * @return the corresponding {@link DataStoreProperty} value
+   * @return the corresponding {@link InterTaskDataStoreProperty} value
    */
   private static ControlMessage.BlockStore convertBlockStore(
-      final DataStoreProperty.Value blockStore) {
+      final InterTaskDataStoreProperty.Value blockStore) {
     switch (blockStore) {
       case MemoryStore:
         return ControlMessage.BlockStore.MEMORY;
@@ -495,21 +495,21 @@ public void onInputContext(final ByteInputContext inputContext) {
 
 
   /**
-   * Encodes {@link DataStoreProperty} value into protocol buffer property.
-   * @param blockStoreType {@link DataStoreProperty} value
+   * Encodes {@link InterTaskDataStoreProperty} value into protocol buffer property.
+   * @param blockStoreType {@link InterTaskDataStoreProperty} value
    * @return the corresponding {@link ControlMessage.BlockStore} value
    */
-  private static DataStoreProperty.Value convertBlockStore(
+  private static InterTaskDataStoreProperty.Value convertBlockStore(
       final ControlMessage.BlockStore blockStoreType) {
     switch (blockStoreType) {
       case MEMORY:
-        return DataStoreProperty.Value.MemoryStore;
+        return InterTaskDataStoreProperty.Value.MemoryStore;
       case SER_MEMORY:
-        return DataStoreProperty.Value.SerializedMemoryStore;
+        return InterTaskDataStoreProperty.Value.SerializedMemoryStore;
       case LOCAL_FILE:
-        return DataStoreProperty.Value.LocalFileStore;
+        return InterTaskDataStoreProperty.Value.LocalFileStore;
       case REMOTE_FILE:
-        return DataStoreProperty.Value.GlusterFileStore;
+        return InterTaskDataStoreProperty.Value.GlusterFileStore;
       default:
         throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported"));
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index fdaecd54..2b29b5f4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -87,13 +87,15 @@ public InputReader(final int dstTaskIndex,
 
   private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
     final String blockId = getBlockId(dstTaskIndex);
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+        = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
     return blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
   }
 
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
     final int numSrcTasks = this.getSourceParallelism();
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+        = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
 
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
@@ -111,7 +113,8 @@ public InputReader(final int dstTaskIndex,
    */
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
     assert (runtimeEdge instanceof StageEdge);
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+        = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
     final KeyRange hashRangeToRead =
         ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index b0dd0800..bb594f62 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -36,7 +36,7 @@
   private final RuntimeEdge<?> runtimeEdge;
   private final String srcVertexId;
   private final IRVertex dstIrVertex;
-  private final DataStoreProperty.Value blockStoreValue;
+  private final InterTaskDataStoreProperty.Value blockStoreValue;
   private final BlockManagerWorker blockManagerWorker;
   private final boolean nonDummyBlock;
   private final Block blockToWrite;
@@ -65,7 +65,7 @@ public OutputWriter(final int hashRangeMultiplier,
     this.srcVertexId = srcRuntimeVertexId;
     this.dstIrVertex = dstIrVertex;
     this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+    this.blockStoreValue = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class).get();
 
     // Setup partitioner
     final int dstParallelism = getDstParallelism();
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 0b119d86..88dda9dd 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -19,9 +19,11 @@
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.Pair;
@@ -36,7 +38,7 @@
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdgeBuilder;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -91,10 +93,10 @@
     SourceVertex.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
-  private static final DataStoreProperty.Value MEMORY_STORE = DataStoreProperty.Value.MemoryStore;
-  private static final DataStoreProperty.Value SER_MEMORY_STORE = DataStoreProperty.Value.SerializedMemoryStore;
-  private static final DataStoreProperty.Value LOCAL_FILE_STORE = DataStoreProperty.Value.LocalFileStore;
-  private static final DataStoreProperty.Value REMOTE_FILE_STORE = DataStoreProperty.Value.GlusterFileStore;
+  private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
+  private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE = InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+  private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE = InterTaskDataStoreProperty.Value.LocalFileStore;
+  private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE = InterTaskDataStoreProperty.Value.GlusterFileStore;
   private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
   private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
   private static final int PARALLELISM_TEN = 10;
@@ -295,7 +297,7 @@ public void testWriteAndRead() throws Exception {
   private void writeAndRead(final BlockManagerWorker sender,
                             final BlockManagerWorker receiver,
                             final DataCommunicationPatternProperty.Value commPattern,
-                            final DataStoreProperty.Value store) throws RuntimeException {
+                            final InterTaskDataStoreProperty.Value store) throws RuntimeException {
     final int testIndex = TEST_INDEX.getAndIncrement();
     final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
     final Pair<IRVertex, IRVertex> verticesPair = setupVertices(edgeId, sender, receiver);
@@ -307,7 +309,7 @@ private void writeAndRead(final BlockManagerWorker sender,
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
     dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
     dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(InterTaskDataStoreProperty.of(store));
     dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
     dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
     dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
@@ -318,14 +320,8 @@ private void writeAndRead(final BlockManagerWorker sender,
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdgeBuilder(edgeId)
-        .setEdgeProperties(edgeProperties)
-        .setSrcVertex(srcMockVertex)
-        .setDstVertex(dstMockVertex)
-        .setSrcStage(srcStage)
-        .setDstStage(dstStage)
-        .setSideInputFlag(false)
-        .build();
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+        srcStage, dstStage, false);
 
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -384,7 +380,7 @@ private void writeAndRead(final BlockManagerWorker sender,
   private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
                                              final BlockManagerWorker receiver,
                                              final DataCommunicationPatternProperty.Value commPattern,
-                                             final DataStoreProperty.Value store) throws RuntimeException {
+                                             final InterTaskDataStoreProperty.Value store) throws RuntimeException {
     final int testIndex = TEST_INDEX.getAndIncrement();
     final int testIndex2 = TEST_INDEX.getAndIncrement();
     final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
@@ -405,7 +401,7 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
         = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
     duplicateDataProperty.get().setGroupSize(2);
-    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(InterTaskDataStoreProperty.of(store));
     dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
     final RuntimeEdge dummyEdge, dummyEdge2;
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
@@ -414,24 +410,12 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdgeBuilder(edgeId)
-        .setEdgeProperties(edgeProperties)
-        .setSrcVertex(srcMockVertex)
-        .setDstVertex(dstMockVertex)
-        .setSrcStage(srcStage)
-        .setDstStage(dstStage)
-        .setSideInputFlag(false)
-        .build();
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+        srcStage, dstStage, false);
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
     final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
-    dummyEdge2 = new StageEdgeBuilder(edgeId2)
-        .setEdgeProperties(edgeProperties)
-        .setSrcVertex(srcMockVertex)
-        .setDstVertex(dstMockVertex2)
-        .setSrcStage(srcStage)
-        .setDstStage(dstStage2)
-        .setSideInputFlag(false)
-        .build();
+    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
+        srcStage, dstStage, false);
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
@@ -561,6 +545,9 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
   private Stage setupStages(final String stageId) {
     final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
 
-    return new Stage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
+    final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty = new ExecutionPropertyMap<>(stageId);
+    stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN));
+    stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0));
+    return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 998f3aaa..5e0a267d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -20,10 +20,11 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.InMemorySourceVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
@@ -67,7 +68,8 @@
     TaskStateManager.class, StageEdge.class})
 public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
-  private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
+  private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
+      = new ExecutionPropertyMap<>("TASK_EXECUTION_PROPERTY_MAP");
   private static final int SOURCE_PARALLELISM = 5;
   private List<Integer> elements;
   private Map<String, List> vertexIdToOutputData;
@@ -137,7 +139,7 @@ public Iterable read() throws IOException {
             "testSourceVertexDataFetching",
             generateTaskId(),
             0,
-            CONTAINER_TYPE,
+            TASK_EXECUTION_PROPERTY_MAP,
             new byte[0],
             Collections.emptyList(),
             Collections.singletonList(mockStageEdgeFrom(sourceIRVertex)),
@@ -167,7 +169,7 @@ public void testParentTaskDataFetching() throws Exception {
         "testSourceVertexDataFetching",
         generateTaskId(),
         0,
-        CONTAINER_TYPE,
+        TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
         Collections.singletonList(mockStageEdgeTo(vertex)),
         Collections.singletonList(mockStageEdgeFrom(vertex)),
@@ -205,7 +207,7 @@ public void testTwoOperators() throws Exception {
         "testSourceVertexDataFetching",
         generateTaskId(),
         0,
-        CONTAINER_TYPE,
+        TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
         Collections.singletonList(mockStageEdgeTo(operatorIRVertex1)),
         Collections.singletonList(mockStageEdgeFrom(operatorIRVertex2)),
@@ -237,7 +239,7 @@ public void testTwoOperatorsWithSideInput() throws Exception {
         "testSourceVertexDataFetching",
         generateTaskId(),
         0,
-        CONTAINER_TYPE,
+        TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
         Arrays.asList(mockStageEdgeTo(operatorIRVertex1), mockStageEdgeTo(operatorIRVertex2)),
         Collections.singletonList(mockStageEdgeFrom(operatorIRVertex2)),
@@ -260,7 +262,7 @@ public void testTwoOperatorsWithSideInput() throws Exception {
                                            final boolean isSideInput) {
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
     ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
-    edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    edgeProperties.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
 
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 273f1084..d06e1b5b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -404,7 +404,7 @@ private void scheduleStage(final Stage stageToSchedule) {
           physicalPlan.getId(),
           taskId,
           attemptIdx,
-          stageToSchedule.getContainerType(),
+          stageToSchedule.getExecutionProperties(),
           stageToSchedule.getSerializedIRDAG(),
           stageIncomingEdges,
           stageOutgoingEdges,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index b8f7d745..5f64e9c3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -44,13 +44,15 @@ public ContainerTypeAwareSchedulingPolicy() {
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
                                                              final Task task) {
 
-    if (task.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    final String executorPlacementPropertyValue = task.getPropertyValue(ExecutorPlacementProperty.class)
+        .orElse(ExecutorPlacementProperty.NONE);
+    if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(task.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(executorPlacementPropertyValue))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index b507a0b4..ddcfbf55 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -62,7 +62,7 @@ public synchronized void add(final Task task) {
       if (taskIdToTask == null) {
         final Map<String, Task> taskIdToTaskMap = new HashMap<>();
         taskIdToTaskMap.put(task.getTaskId(), task);
-        updateSchedulableStages(stageId, task.getContainerType());
+        updateSchedulableStages(stageId);
         return taskIdToTaskMap;
       } else {
         taskIdToTask.put(task.getTaskId(), task);
@@ -102,7 +102,7 @@ public synchronized Task remove(final String taskId) throws NoSuchElementExcepti
       }
       stageIdToPendingTasks.remove(stageId);
       stageIdToPendingTasks.forEach((scheduledStageId, tasks) ->
-          updateSchedulableStages(scheduledStageId, tasks.values().iterator().next().getContainerType()));
+          updateSchedulableStages(scheduledStageId));
     }
 
     return taskToSchedule;
@@ -157,18 +157,14 @@ private synchronized void removeStageAndChildren(final String stageId) {
    * NOTE: This method provides the "line up" between stages, by assigning priorities,
    * serving as the key to the "priority" implementation of this class.
    * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be scheduled.
    */
-  private synchronized void updateSchedulableStages(
-      final String candidateStageId, final String candidateStageContainerType) {
+  private synchronized void updateSchedulableStages(final String candidateStageId) {
     final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
 
-    if (isSchedulable(candidateStageId, candidateStageContainerType)) {
+    if (isSchedulable(candidateStageId)) {
       // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
       jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        // Remove the ancestor stage if it is of the same container type.
-        if (schedulableStages.contains(ancestorStage.getId())
-            && candidateStageContainerType.equals(ancestorStage.getContainerType())) {
+        if (schedulableStages.contains(ancestorStage.getId())) {
           if (!schedulableStages.remove(ancestorStage.getId())) {
             throw new RuntimeException(String.format("No such stage: %s", ancestorStage.getId()));
           }
@@ -183,16 +179,13 @@ private synchronized void updateSchedulableStages(
   /**
    * Determines whether the given candidate stage is schedulable immediately or not.
    * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be scheduled.
    * @return true if schedulable, false otherwise.
    */
-  private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
+  private synchronized boolean isSchedulable(final String candidateStageId) {
     final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
     for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
       if (schedulableStages.contains(descendantStage.getId())) {
-        if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
-          return false;
-        }
+        return false;
       }
     }
     return true;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index b8953fb1..db5b1d44 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -122,7 +122,7 @@ public void testWaitUntilFinish() {
     final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
     final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
+        new PhysicalPlan("TestPlan", physicalDAG),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertFalse(jobStateManager.checkJobTermination());
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 55808c9b..7a2d1490 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -49,7 +49,8 @@ public void testContainerTypeAware() {
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
     final Task task1 = mock(Task.class);
-    when(task1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    when(task1.getPropertyValue(ExecutorPlacementProperty.class))
+        .thenReturn(Optional.of(ExecutorPlacementProperty.RESERVED));
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
@@ -60,7 +61,8 @@ public void testContainerTypeAware() {
     assertEquals(expectedExecutors1, candidateExecutors1);
 
     final Task task2 = mock(Task.class);
-    when(task2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    when(task2.getPropertyValue(ExecutorPlacementProperty.class))
+        .thenReturn(Optional.of(ExecutorPlacementProperty.NONE));
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index bb9fc2c4..96c65eb2 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -136,18 +136,18 @@ public void testContainerRemoval() throws Exception {
     final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0) {
+      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 1) {
+      } else if (stage.getScheduleGroupIndex() == 2) {
         scheduler.onExecutorRemoved("a3");
-        // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+        // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
@@ -165,11 +165,14 @@ public void testContainerRemoval() throws Exception {
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else {
-        // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
+      } else if (stage.getScheduleGroupIndex() == 3) {
+        // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3.
         // Schedule only the first Task
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, true);
+      } else {
+        throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d",
+            stage.getScheduleGroupIndex()));
       }
     }
   }
@@ -207,17 +210,17 @@ public void testOutputFailure() throws Exception {
     final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0) {
+      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+      } else if (stage.getScheduleGroupIndex() == 2) {
+        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
@@ -230,7 +233,7 @@ public void testOutputFailure() throws Exception {
 
         }
 
-        assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 3);
+        assertEquals(3, jobStateManager.getAttemptCountForStage(stage.getId()));
         assertFalse(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId -> {
           assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
@@ -273,17 +276,17 @@ public void testInputReadFailure() throws Exception {
     final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0) {
+      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+      } else if (stage.getScheduleGroupIndex() == 2) {
+        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
         SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
@@ -296,7 +299,7 @@ public void testInputReadFailure() throws Exception {
 
         }
 
-        assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
+        assertEquals(2, jobStateManager.getAttemptCountForStage(stage.getId()));
         stage.getTaskIds().forEach(taskId -> {
           assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
               TaskState.State.READY);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 23c5ee74..bb48746a 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -217,7 +217,7 @@ private void scheduleStage(final Stage stage) {
             "TestPlan",
             taskId,
             0,
-            stage.getContainerType(),
+            stage.getExecutionProperties(),
             stage.getSerializedIRDAG(),
             Collections.emptyList(),
             Collections.emptyList(),
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 65f10b2b..0af8c8d8 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -100,7 +100,7 @@ private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDA
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
-    return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
+    return new PhysicalPlan("TestPlan", physicalDAG);
   }
 
   /**
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
new file mode 100644
index 00000000..a60cf880
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan;
+
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.common.test.EmptyComponents;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests {@link StagePartitioner}.
+ */
+public final class StagePartitionerTest {
+  private static final Transform EMPTY_TRANSFORM = new EmptyComponents.EmptyTransform("empty");
+
+  private StagePartitioner stagePartitioner;
+
+  @Before
+  public void setup() throws InjectionException {
+    stagePartitioner = Tang.Factory.getTang().newInjector().getInstance(StagePartitioner.class);
+    stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
+  }
+
+  /**
+   * @param parallelism {@link ParallelismProperty} value for the new vertex
+   * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for the new vertex
+   * @param otherProperties other {@link VertexExecutionProperty} for the new vertex
+   * @return new {@link IRVertex}
+   */
+  private static IRVertex newVertex(final int parallelism, final int scheduleGroupIndex,
+                                    final List<VertexExecutionProperty> otherProperties) {
+    final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
+    vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
+    vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+    otherProperties.forEach(property -> vertex.getExecutionProperties().put(property));
+    return vertex;
+  }
+
+  /**
+   * A simple case where two vertices have common parallelism and ScheduleGroupIndex so that get merged into one stage.
+   */
+  @Test
+  public void testLinear() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(5, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertEquals(0, (int) partitioning.get(v0));
+    assertEquals(0, (int) partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where two vertices have different parallelism.
+   */
+  @Test
+  public void testSplitByParallelism() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where two vertices have different ScheduleGroupIndex.
+   */
+  @Test
+  public void testSplitByScheduleGroupIndex() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(1, 1, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where two vertices are connected with Shuffle edge.
+   */
+  @Test
+  public void testSplitByShuffle() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where one of the two vertices has additional property.
+   */
+  @Test
+  public void testSplitByOtherProperty() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0,
+        Arrays.asList(ExecutorPlacementProperty.of(ExecutorPlacementProperty.RESERVED)));
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+  }
+
+  /**
+   * A simple case where one of the two vertices has ignored property.
+   */
+  @Test
+  public void testNotSplitByIgnoredProperty() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(1, 0,
+        Arrays.asList(DynamicOptimizationProperty.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+    final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertEquals(0, (int) partitioning.get(v0));
+    assertEquals(0, (int) partitioning.get(v1));
+  }
+
+  /**
+   * Test scenario when there is a join.
+   */
+  @Test
+  public void testJoin() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v1 = newVertex(5, 0, Collections.emptyList());
+    final IRVertex v2 = newVertex(5, 0, Collections.emptyList());
+    dagBuilder.addVertex(v0);
+    dagBuilder.addVertex(v1);
+    dagBuilder.addVertex(v2);
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v2));
+    dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2));
+    final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+    assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+    assertNotEquals(partitioning.get(v1), partitioning.get(v2));
+    assertNotEquals(partitioning.get(v2), partitioning.get(v0));
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index 03952fe9..0d60a5d4 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -82,7 +82,7 @@ public void testState() throws Exception {
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
     final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
+        new PhysicalPlan("TestPlan", physicalDAG),
         pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
new file mode 100644
index 00000000..5cf95b5d
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.common.test.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
+import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
+import edu.snu.nemo.tests.compiler.CompilerTestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link DefaultScheduleGroupPass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class DefaultScheduleGroupPassTest {
+  private static final Transform EMPTY_TRANSFORM = new EmptyComponents.EmptyTransform("empty");
+
+  @Test
+  public void testAnnotatingPass() {
+    final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass();
+    assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
+  }
+
+  /**
+   * This test ensures that a topologically sorted DAG has an increasing sequence of schedule group indexes.
+   */
+  @Test
+  public void testTopologicalOrdering() throws Exception {
+    final DAG<IRVertex, IREdge> compiledDAG = CompilerTestUtil.compileALSDAG();
+    final DAG<IRVertex, IREdge> processedDAG = CompiletimeOptimizer.optimize(compiledDAG,
+        new TestPolicy(), "");
+
+    for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
+      final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
+      final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
+          .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
+          .max().orElse(0);
+      assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
+    }
+  }
+
+  /**
+   * Return a DAG that has a branch.
+   * {@literal
+   *           /-- v3 --- v4
+   * v0 --- v1 --- v2 --/
+   * }
+   *
+   * @param communicationPattern {@link DataCommunicationPatternProperty.Value} for the edges
+   * @param dataFlowModel {@link DataFlowModelProperty.Value} for the edges
+   * @return a {@link Pair} of {@link DAG} and {@link List} of {@link IRVertex}
+   */
+  private static Pair<DAG<IRVertex, IREdge>, List<IRVertex>> generateBranchDAG(
+      final DataCommunicationPatternProperty.Value communicationPattern,
+      final DataFlowModelProperty.Value dataFlowModel) {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final IRVertex v0 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v1 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v2 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v3 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v4 = new OperatorVertex(EMPTY_TRANSFORM);
+
+    final IREdge e0 = new IREdge(communicationPattern, v0, v1);
+    final IREdge e1 = new IREdge(communicationPattern, v1, v2);
+    final IREdge e2 = new IREdge(communicationPattern, v1, v3);
+    final IREdge e3 = new IREdge(communicationPattern, v2, v4);
+    final IREdge e4 = new IREdge(communicationPattern, v3, v4);
+
+    final List<IRVertex> vertices = Arrays.asList(v0, v1, v2, v3, v4);
+    for (final IRVertex vertex : vertices) {
+      dagBuilder.addVertex(vertex);
+    }
+    for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
+      edge.getExecutionProperties().put(DataFlowModelProperty.of(dataFlowModel));
+      dagBuilder.connectVertices(edge);
+    }
+    return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
+  }
+
+  /**
+   * Return a DAG that has a join.
+   * {@literal
+   * v0 --- v1 --- v4 -- v5
+   * v2 --- v3 --/
+   * }
+   *
+   * @param communicationPattern {@link DataCommunicationPatternProperty.Value} for the edges
+   * @param dataFlowModel {@link DataFlowModelProperty.Value} for the edges
+   * @return a {@link Pair} of {@link DAG} and {@link List} of {@link IRVertex}
+   */
+  private static Pair<DAG<IRVertex, IREdge>, List<IRVertex>> generateJoinDAG(
+      final DataCommunicationPatternProperty.Value communicationPattern,
+      final DataFlowModelProperty.Value dataFlowModel) {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final IRVertex v0 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v1 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v2 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v3 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v4 = new OperatorVertex(EMPTY_TRANSFORM);
+    final IRVertex v5 = new OperatorVertex(EMPTY_TRANSFORM);
+
+    final IREdge e0 = new IREdge(communicationPattern, v0, v1);
+    final IREdge e1 = new IREdge(communicationPattern, v2, v3);
+    final IREdge e2 = new IREdge(communicationPattern, v1, v4);
+    final IREdge e3 = new IREdge(communicationPattern, v3, v4);
+    final IREdge e4 = new IREdge(communicationPattern, v4, v5);
+
+    final List<IRVertex> vertices = Arrays.asList(v0, v1, v2, v3, v4, v5);
+    for (final IRVertex vertex : vertices) {
+      dagBuilder.addVertex(vertex);
+    }
+    for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
+      edge.getExecutionProperties().put(DataFlowModelProperty.of(dataFlowModel));
+      dagBuilder.connectVertices(edge);
+    }
+    return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
+  }
+
+  /**
+   * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code expected}.
+   * @param expected the expected property value
+   * @param vertex the vertex to test
+   */
+  private static void assertScheduleGroupIndex(final int expected, final IRVertex vertex) {
+    assertEquals(expected, getScheduleGroupIndex(vertex));
+  }
+
+  /**
+   * @param vertex a vertex
+   * @return {@link ScheduleGroupIndexProperty} of {@code vertex}
+   */
+  private static int getScheduleGroupIndex(final IRVertex vertex) {
+    return vertex.getPropertyValue(ScheduleGroupIndexProperty.class)
+        .orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup not set for %s", vertex.getId())));
+  }
+
+  /**
+   * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupIndexProperty} value.
+   * @param vertices vertices to test
+   */
+  private static void assertDifferentScheduleGroupIndex(final Collection<IRVertex> vertices) {
+    final Set<Integer> indices = new HashSet<>();
+    vertices.forEach(v -> {
+      final int idx = getScheduleGroupIndex(v);
+      assertFalse(indices.contains(idx));
+      indices.add(idx);
+    });
+  }
+
+  /**
+   * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code true} and the DAG contains a branch.
+   */
+  @Test
+  public void testBranch() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+    pass.apply(dag.left());
+    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+  }
+
+  /**
+   * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code false} and the DAG contains a branch.
+   */
+  @Test
+  public void testBranchWhenMultipleInEdgeNotAllowed() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, false, false);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+    pass.apply(dag.left());
+    dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v));
+  }
+
+  /**
+   * Test scenario to determine whether push edges properly enforces same scheduleGroupIndex or not.
+   */
+  @Test
+  public void testBranchWithPush() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, false, false);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Push);
+    pass.apply(dag.left());
+    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+  }
+
+  /**
+   * Test scenario when {@code allowBroadcastWithinScheduleGroup} is {@code false} and DAG contains Broadcast edges.
+   */
+  @Test
+  public void testBranchWithBroadcast() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, true, true);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, DataFlowModelProperty.Value.Pull);
+    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+  }
+
+  /**
+   * Test scenario when {@code allowShuffleWithinScheduleGroup} is {@code false} and DAG contains Shuffle edges.
+   */
+  @Test
+  public void testBranchWithShuffle() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, false, true);
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Pull);
+    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+  }
+
+  /**
+   * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code true} and the DAG contains a join.
+   */
+  @Test
+  public void testJoin() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+    pass.apply(dag.left());
+    final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 4).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+    dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v));
+  }
+
+  /**
+   * Test scenario with multiple push inEdges.
+   */
+  @Test
+  public void testJoinWithPush() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
+    pass.apply(dag.left());
+    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+  }
+
+  /**
+   * Test scenario when single push inEdges.
+   */
+  @Test
+  public void testJoinWithSinglePush() {
+    final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+    final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+        = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
+    dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
+        .getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+    pass.apply(dag.left());
+    final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 6).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
deleted file mode 100644
index 9e222fcc..00000000
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
-import edu.snu.nemo.tests.compiler.CompilerTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test {@link ScheduleGroupPass}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(JobLauncher.class)
-public final class ScheduleGroupPassTest {
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  @Test
-  public void testAnnotatingPass() {
-    final AnnotatingPass scheduleGroupPass = new ScheduleGroupPass();
-    assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
-  }
-
-  /**
-   * This test ensures that a topologically sorted DAG has an increasing sequence of schedule group indexes.
-   */
-  @Test
-  public void testScheduleGroupPass() throws Exception {
-    final DAG<IRVertex, IREdge> compiledDAG = CompilerTestUtil.compileALSDAG();
-    final DAG<IRVertex, IREdge> processedDAG = CompiletimeOptimizer.optimize(compiledDAG,
-        new TestPolicy(), "");
-
-    for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
-      final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
-      final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
-          .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
-          .max().orElse(0);
-      assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
-    }
-  }
-}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
index 898f74ec..d4cc616b 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
@@ -19,13 +19,11 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DisaggregationEdgeDataStorePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ReviseInterStageEdgeDataStorePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultInterTaskDataStorePass;
 import edu.snu.nemo.tests.compiler.CompilerTestUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,22 +50,12 @@ public void setUp() throws Exception {
   public void testDisaggregation() throws Exception {
     final DAG<IRVertex, IREdge> processedDAG =
         new DisaggregationEdgeDataStorePass().apply(
-            new ReviseInterStageEdgeDataStorePass().apply(
-                new DefaultStagePartitioningPass().apply(
-                    new DefaultParallelismPass().apply(compiledDAG))));
+            new DefaultInterTaskDataStorePass().apply(
+                  new DefaultParallelismPass().apply(compiledDAG)));
 
-    processedDAG.getTopologicalSort().forEach(irVertex -> {
-      processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger -> {
-        if (DataCommunicationPatternProperty.Value.OneToOne
-            .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())
-            && edgeToMerger.getSrc().getPropertyValue(StageIdProperty.class).get()
-            .equals(edgeToMerger.getDst().getPropertyValue(StageIdProperty.class).get())) {
-          assertEquals(DataStoreProperty.Value.MemoryStore, edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
-        } else {
-          assertEquals(DataStoreProperty.Value.GlusterFileStore,
-              edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
-        }
-      });
-    });
+    processedDAG.getTopologicalSort().forEach(irVertex ->
+      processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger ->
+          assertEquals(InterTaskDataStoreProperty.Value.GlusterFileStore,
+              edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get())));
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
index 73014e28..f983698d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
@@ -19,7 +19,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.PadoEdgeDataStorePass;
@@ -57,35 +57,35 @@ public void testPadoPass() throws Exception {
     final IRVertex vertex5 = processedDAG.getTopologicalSort().get(1);
     assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex5.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex5).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex6 = processedDAG.getTopologicalSort().get(2);
     assertEquals(ExecutorPlacementProperty.RESERVED, vertex6.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex6).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex4 = processedDAG.getTopologicalSort().get(6);
     assertEquals(ExecutorPlacementProperty.RESERVED, vertex4.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex4).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex12 = processedDAG.getTopologicalSort().get(10);
     assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex12.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex12).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex14 = processedDAG.getTopologicalSort().get(12);
     assertEquals(ExecutorPlacementProperty.RESERVED, vertex14.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex14).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
       assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index e779f581..ab1beb7b 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -61,8 +61,8 @@ public void testSailfish() {
                 edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
             assertEquals(UsedDataHandlingProperty.Value.Discard,
                 edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
-            assertEquals(DataStoreProperty.Value.SerializedMemoryStore,
-                edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
+            assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
+                edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
             assertEquals(BytesDecoderFactory.of(),
                 edgeToMerger.getPropertyValue(DecoderProperty.class).get());
           } else {
@@ -75,8 +75,8 @@ public void testSailfish() {
               edgeFromMerger.getPropertyValue(DataFlowModelProperty.class).get());
           assertEquals(DataCommunicationPatternProperty.Value.OneToOne,
               edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
-          assertEquals(DataStoreProperty.Value.LocalFileStore,
-              edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
+          assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
+              edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
           assertEquals(BytesEncoderFactory.of(),
               edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
         });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 8d13374b..b1ea8f83 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -16,8 +16,7 @@
 package edu.snu.nemo.tests.compiler.optimizer.policy;
 
 import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.PadoCompositePass;
 import edu.snu.nemo.compiler.optimizer.policy.*;
 import org.junit.Test;
@@ -29,21 +28,21 @@
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(15, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(14, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(17, padoPolicy.getCompileTimePasses().size());
+    assertEquals(16, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(19, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(18, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 
@@ -52,8 +51,7 @@ public void testShouldFailPolicy() {
     try {
       final Policy failPolicy = new PolicyBuilder()
           .registerCompileTimePass(new PadoCompositePass())
-          .registerCompileTimePass(new DefaultStagePartitioningPass())
-          .registerCompileTimePass(new ScheduleGroupPass())
+          .registerCompileTimePass(new DefaultScheduleGroupPass())
           .build();
     } catch (Exception e) { // throw an exception if default execution properties are not set.
       assertTrue(e instanceof CompileTimeOptimizationException);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
index 51ca1aed..59dbd4eb 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
@@ -39,13 +39,12 @@ public TestPolicy(final boolean testPushPolicy) {
   @Override
   public List<CompileTimePass> getCompileTimePasses() {
     List<CompileTimePass> policy = new ArrayList<>();
-    policy.add(new DefaultStagePartitioningPass());
 
     if (testPushPolicy) {
       policy.add(new ShuffleEdgePushPass());
     }
 
-    policy.add(new ScheduleGroupPass());
+    policy.add(new DefaultScheduleGroupPass());
     return policy;
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index 671fdb09..51750456 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -154,35 +154,35 @@ public void testComplexPlan() throws Exception {
 //    irDAGBuilder.addVertex(v7);
 
     final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2);
-    e1.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e1.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e1.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
     final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3);
-    e2.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e2.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e2.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
     final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
-    e3.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e3.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e3.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
     final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5);
-    e4.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+    e4.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
     e4.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
     final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6);
-    e5.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+    e5.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
     e5.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
     final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8);
-    e6.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+    e6.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
     e6.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
 //    final IREdge e7 = new IREdge(OneToOne, v7, v5);
-//    e7.setProperty(DataStoreProperty.of(MemoryStore));
+//    e7.setProperty(InterTaskDataStoreProperty.of(MemoryStore));
 //    e7.setProperty(Attribute.Key.PullOrPush, DataFlowModelProperty.Value.Push));
 //
 //    final IREdge e8 = new IREdge(OneToOne, v5, v8);
-//    e8.setProperty(DataStoreProperty.of(MemoryStore));
+//    e8.setProperty(InterTaskDataStoreProperty.of(MemoryStore));
 //    e8.setProperty(Attribute.Key.PullOrPush, DataFlowModelProperty.Value.Pull));
 
     // Stage 1 = {v1, v2, v3}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services