You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/05 08:02:58 UTC

[GitHub] jeongyooneo closed pull request #26: [NEMO-78] Rename PhysicalStage to Stage

jeongyooneo closed pull request #26: [NEMO-78] Rename PhysicalStage to Stage
URL: https://github.com/apache/incubator-nemo/pull/26
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index e4c77bf9..85538e70 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -48,18 +48,18 @@ class JobState:
     def __init__(self, data):
         self.id = data['jobId']
         self.stages = {}
-        for stage in data['physicalStages']:
-            self.stages[stage['id']] = PhysicalStageState(stage)
+        for stage in data['stages']:
+            self.stages[stage['id']] = StageState(stage)
     @classmethod
     def empty(cls):
-        return cls({'jobId': None, 'physicalStages': []})
+        return cls({'jobId': None, 'stages': []})
     def get(self, id):
         try:
             return self.stages[id]
         except:
-            return PhysicalStageState.empty()
+            return StageState.empty()
 
-class PhysicalStageState:
+class StageState:
     def __init__(self, data):
         self.id = data['id']
         self.state = data['state']
@@ -114,7 +114,7 @@ def dot(self):
 
 def Vertex(id, properties, state):
     try:
-        return PhysicalStage(id, properties, state)
+        return Stage(id, properties, state)
     except:
         pass
     try:
@@ -249,7 +249,7 @@ def internalDstFor(self, edgeWithLoopId):
         vertexId = list(filter(lambda v: edgeId in self.incoming[v], self.incoming))[0]
         return self.dag.vertices[vertexId]
 
-class PhysicalStage:
+class Stage:
     def __init__(self, id, properties, state):
         self.id = id
         self.irVertex = DAG(properties['irDag'], JobState.empty())
@@ -276,7 +276,7 @@ def logicalEnd(self):
 
 def Edge(src, dst, properties):
     try:
-        return PhysicalStageEdge(src, dst, properties)
+        return StageEdge(src, dst, properties)
     except:
         pass
     try:
@@ -325,7 +325,7 @@ def dot(self):
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
-class PhysicalStageEdge:
+class StageEdge:
     def __init__(self, src, dst, properties):
         self.src = src
         self.dst = dst
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 4ef453c9..27b98ea5 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -20,10 +20,10 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.apache.reef.tang.Tang;
 
 /**
@@ -57,9 +57,9 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG) throws Exception
    */
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalStageDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
     final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
-        physicalStageDAG, physicalPlanGenerator.getIdToIRVertex());
+        stageDAG, physicalPlanGenerator.getIdToIRVertex());
     return physicalPlan;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index 34c66fcc..7fd0d828 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -21,7 +21,6 @@
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -93,7 +92,11 @@ public String toString() {
 
     @Override
     public List<Readable<T>> getReadables(final int desirednumOfSplits) {
-      return Arrays.asList(new EmptyReadable<>());
+      final List list = new ArrayList(desirednumOfSplits);
+      for (int i = 0; i < desirednumOfSplits; i++) {
+        list.add(new EmptyReadable<>());
+      }
+      return list;
     }
 
     @Override
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index 44b0504e..d4f44321 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@ -76,14 +76,6 @@ public void testSailfish() throws Exception {
         .build());
   }
 
-  @Test (timeout = TIMEOUT)
-  public void testDisagg() throws Exception {
-    JobLauncher.main(builder
-        .addJobId(MapReduceITCase.class.getSimpleName() + "_disagg")
-        .addOptimizationPolicy(DisaggregationPolicyParallelismFive.class.getCanonicalName())
-        .build());
-  }
-
   @Test (timeout = TIMEOUT)
   public void testPado() throws Exception {
     JobLauncher.main(builder
diff --git a/examples/resources/beam_sample_executor_resources.json b/examples/resources/beam_sample_executor_resources.json
index 1f7f9737..ced110c3 100644
--- a/examples/resources/beam_sample_executor_resources.json
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -1,17 +1,12 @@
 [
   {
     "type": "Transient",
-    "memory_mb": 300,
-    "capacity": 3
+    "memory_mb": 512,
+    "capacity": 5
   },
   {
     "type": "Reserved",
-    "memory_mb": 300,
+    "memory_mb": 512,
     "capacity": 5
-  },
-  {
-    "type": "Compute",
-    "memory_mb": 300,
-    "capacity": 3
   }
 ]
diff --git a/examples/resources/spark_sample_executor_resources.json b/examples/resources/spark_sample_executor_resources.json
index 187bd452..ced110c3 100644
--- a/examples/resources/spark_sample_executor_resources.json
+++ b/examples/resources/spark_sample_executor_resources.json
@@ -2,16 +2,11 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 2
+    "capacity": 5
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 2
-  },
-  {
-    "type": "Compute",
-    "memory_mb": 512,
-    "capacity": 2
+    "capacity": 5
   }
 ]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index b53ff344..e4fef9c3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -40,7 +40,7 @@ private RuntimeIdGenerator() {
   //////////////////////////////////////////////////////////////// Generate IDs
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
+   * Generates the ID for physical plan.
    *
    * @return the generated ID
    */
@@ -49,7 +49,7 @@ public static String generatePhysicalPlanId() {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.stage.StageEdge}.
+   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.StageEdge}.
    *
    * @param irEdgeId .
    * @return the generated ID
@@ -59,17 +59,7 @@ public static String generateStageEdgeId(final String irEdgeId) {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.RuntimeEdge}.
-   *
-   * @param irEdgeId .
-   * @return the generated ID
-   */
-  public static String generateRuntimeEdgeId(final String irEdgeId) {
-    return "REdge-" + irEdgeId;
-  }
-
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.stage.Stage}.
+   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.Stage}.
    * @param stageId stage ID in numeric form.
    * @return the generated ID
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index e63f1d0d..e9bd6eb0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 /**
  * An event for triggering dynamic optimization.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index 6fa6bc9b..63cd56ed 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -23,7 +23,7 @@
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.optimizer.RuntimeOptimizer;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.reef.wake.impl.PubSubEventHandler;
 
 import javax.inject.Inject;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
index ed576bde..fe91fd73 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.CompilerEvent;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 /**
  * An event for updating the physical plan in the scheduler.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
index 8e002806..ab03331a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.util.*;
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 7d3e7f4e..ed2f92cb 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -24,9 +24,9 @@
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
 import org.slf4j.Logger;
@@ -60,16 +60,16 @@ public DataSkewRuntimePass() {
   @Override
   public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, List<Pair<Integer, Long>>> metricData) {
     // Builder to create new stages.
-    final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder =
+    final DAGBuilder<Stage, StageEdge> physicalDAGBuilder =
         new DAGBuilder<>(originalPlan.getStageDAG());
 
     // get edges to optimize
     final List<String> optimizationEdgeIds = metricData.keySet().stream().map(blockId ->
         RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
-    final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = originalPlan.getStageDAG();
-    final List<PhysicalStageEdge> optimizationEdges = stageDAG.getVertices().stream()
-        .flatMap(physicalStage -> stageDAG.getIncomingEdgesOf(physicalStage).stream())
-        .filter(physicalStageEdge -> optimizationEdgeIds.contains(physicalStageEdge.getId()))
+    final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
+    final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
+        .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
+        .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId()))
         .collect(Collectors.toList());
 
     // Get number of evaluators of the next stage (number of blocks).
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 57c929cb..99401fb6 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.io.Serializable;
 import java.util.Set;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
similarity index 89%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
index e7e37efa..1da8f978 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
@@ -29,8 +29,8 @@
   private final String jobId;
   private final String taskId;
   private final int taskIdx;
-  private final List<PhysicalStageEdge> taskIncomingEdges;
-  private final List<PhysicalStageEdge> taskOutgoingEdges;
+  private final List<StageEdge> taskIncomingEdges;
+  private final List<StageEdge> taskOutgoingEdges;
   private final int attemptIdx;
   private final String containerType;
   private final byte[] serializedTaskDag;
@@ -53,8 +53,8 @@ public ExecutableTask(final String jobId,
                         final int attemptIdx,
                         final String containerType,
                         final byte[] serializedIRDag,
-                        final List<PhysicalStageEdge> taskIncomingEdges,
-                        final List<PhysicalStageEdge> taskOutgoingEdges,
+                        final List<StageEdge> taskIncomingEdges,
+                        final List<StageEdge> taskOutgoingEdges,
                         final Map<String, Readable> irVertexIdToReadable) {
     this.jobId = jobId;
     this.taskId = taskId;
@@ -98,14 +98,14 @@ public int getTaskIdx() {
   /**
    * @return the incoming edges of the task.
    */
-  public List<PhysicalStageEdge> getTaskIncomingEdges() {
+  public List<StageEdge> getTaskIncomingEdges() {
     return taskIncomingEdges;
   }
 
   /**
    * @return the outgoing edges of the task.
    */
-  public List<PhysicalStageEdge> getTaskOutgoingEdges() {
+  public List<StageEdge> getTaskOutgoingEdges() {
     return taskOutgoingEdges;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
similarity index 87%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index b25a0bad..8809bd3d 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -26,7 +26,7 @@
  */
 public final class PhysicalPlan implements Serializable {
   private final String id;
-  private final DAG<PhysicalStage, PhysicalStageEdge> stageDAG;
+  private final DAG<Stage, StageEdge> stageDAG;
   private final Map<String, IRVertex> idToIRVertex;
 
   /**
@@ -37,7 +37,7 @@
    * @param idToIRVertex map from task to IR vertex.
    */
   public PhysicalPlan(final String id,
-                      final DAG<PhysicalStage, PhysicalStageEdge> stageDAG,
+                      final DAG<Stage, StageEdge> stageDAG,
                       final Map<String, IRVertex> idToIRVertex) {
     this.id = id;
     this.stageDAG = stageDAG;
@@ -54,7 +54,7 @@ public String getId() {
   /**
    * @return the DAG of stages.
    */
-  public DAG<PhysicalStage, PhysicalStageEdge> getStageDAG() {
+  public DAG<Stage, StageEdge> getStageDAG() {
     return stageDAG;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
similarity index 64%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 27991e3a..adaf7528 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
@@ -22,10 +22,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.stage.*;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
 import org.apache.reef.tang.annotations.Parameter;
@@ -37,10 +34,9 @@
 /**
  * A function that converts an IR DAG to physical DAG.
  */
-public final class PhysicalPlanGenerator
-    implements Function<DAG<IRVertex, IREdge>, DAG<PhysicalStage, PhysicalStageEdge>> {
-
-  final String dagDirectory;
+public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
+  private final Map<String, IRVertex> idToIRVertex;
+  private final String dagDirectory;
 
   /**
    * Private constructor.
@@ -49,6 +45,7 @@
    */
   @Inject
   private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
+    this.idToIRVertex = new HashMap<>();
     this.dagDirectory = dagDirectory;
   }
 
@@ -59,7 +56,7 @@ private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final Strin
    * @return {@link PhysicalPlan} to execute.
    */
   @Override
-  public DAG<PhysicalStage, PhysicalStageEdge> apply(final DAG<IRVertex, IREdge> irDAG) {
+  public DAG<Stage, StageEdge> apply(final DAG<IRVertex, IREdge> irDAG) {
     // first, stage-partition the IR DAG.
     final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG);
 
@@ -68,9 +65,12 @@ private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final Strin
 
     // for debugging purposes.
     dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan");
-    // then create tasks and make it into a physical execution plan.
 
-    return stagesIntoPlan(dagOfStages);
+    return dagOfStages;
+  }
+
+  public Map<String, IRVertex> getIdToIRVertex() {
+    return idToIRVertex;
   }
 
   /**
@@ -124,24 +124,50 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     final Map<IRVertex, Stage> vertexStageMap = new HashMap<>();
 
     for (final List<IRVertex> stageVertices : vertexListForEachStage.values()) {
+      integrityCheck(stageVertices);
+
       final Set<IRVertex> currentStageVertices = new HashSet<>();
       final Set<StageEdgeBuilder> currentStageIncomingEdges = new HashSet<>();
 
       // Create a new stage builder.
       final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
           .orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
-      final StageBuilder stageBuilder =
-          new StageBuilder(irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
-              irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex));
+      final StageBuilder stageBuilder = new StageBuilder(
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism),
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex),
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+
+      // Prepare useful variables.
+      final int stageParallelism = irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism);
+      final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
+      for (int i = 0; i < stageParallelism; i++) {
+        vertexIdToReadables.add(new HashMap<>());
+      }
 
       // For each vertex in the stage,
       for (final IRVertex irVertex : stageVertices) {
+        // Take care of the readables of a source vertex.
+        if (irVertex instanceof SourceVertex) {
+          final SourceVertex sourceVertex = (SourceVertex) irVertex;
+          try {
+            final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
+            for (int i = 0; i < stageParallelism; i++) {
+              vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
+            }
+          } catch (Exception e) {
+            throw new PhysicalPlanGenerationException(e);
+          }
+
+          // Clear internal metadata.
+          sourceVertex.clearInternalStates();
+        }
 
         // Add vertex to the stage.
         stageBuilder.addVertex(irVertex);
         currentStageVertices.add(irVertex);
 
-        // Connect all the incoming edges for the vertex
+        // Connect all the incoming edges for the vertex.
         final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(irVertex);
         final Optional<List<IREdge>> inEdgeList = (inEdges == null) ? Optional.empty() : Optional.of(inEdges);
         inEdgeList.ifPresent(edges -> edges.forEach(irEdge -> {
@@ -156,7 +182,12 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
 
           // both vertices are in the stage.
           if (currentStageVertices.contains(srcVertex) && currentStageVertices.contains(dstVertex)) {
-            stageBuilder.connectInternalVertices(irEdge);
+            stageBuilder.connectInternalVertices(new RuntimeEdge<IRVertex>(
+                irEdge.getId(),
+                irEdge.getExecutionProperties(),
+                irEdge.getSrc(),
+                irEdge.getDst(),
+                irEdge.getCoder()));
           } else { // edge comes from another stage
             final Stage srcStage = vertexStageMap.get(srcVertex);
 
@@ -167,15 +198,19 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
 
             final StageEdgeBuilder newEdgeBuilder = new StageEdgeBuilder(irEdge.getId())
                 .setEdgeProperties(irEdge.getExecutionProperties())
-                .setSrcVertex(srcVertex).setDstVertex(dstVertex)
+                .setSrcVertex(srcVertex)
+                .setDstVertex(dstVertex)
                 .setSrcStage(srcStage)
                 .setCoder(irEdge.getCoder())
                 .setSideInputFlag(irEdge.isSideInput());
             currentStageIncomingEdges.add(newEdgeBuilder);
           }
         }));
-      }
 
+        // Track id to irVertex.
+        idToIRVertex.put(irVertex.getId(), irVertex);
+      }
+      stageBuilder.addReadables(vertexIdToReadables);
       // If this runtime stage contains at least one vertex, build it!
       if (!stageBuilder.isEmpty()) {
         final Stage currentStage = stageBuilder.build();
@@ -197,110 +232,32 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     return dagOfStagesBuilder.build();
   }
 
-
-  private final Map<String, IRVertex> idToIRVertex = new HashMap<>();
-
-  /**
-   * @return the idToIRVertex map.
-   */
-  public Map<String, IRVertex> getIdToIRVertex() {
-    return idToIRVertex;
-  }
-
   /**
-   * Converts the given DAG of stages to a physical DAG for execution.
-   *
-   * @param dagOfStages IR DAG partitioned into stages.
-   * @return the converted physical DAG to execute,
-   * which consists of {@link PhysicalStage} and their relationship represented by {@link PhysicalStageEdge}.
+   * Integrity check for a stage's vertices.
+   * @param stageVertices to check for
    */
-  private DAG<PhysicalStage, PhysicalStageEdge> stagesIntoPlan(final DAG<Stage, StageEdge> dagOfStages) {
-    final Map<String, PhysicalStage> runtimeStageIdToPhysicalStageMap = new HashMap<>();
-    final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder = new DAGBuilder<>();
-
-    for (final Stage stage : dagOfStages.getVertices()) {
-      final List<IRVertex> stageVertices = stage.getStageInternalDAG().getVertices();
-
-      final ExecutionPropertyMap firstVertexProperties = stageVertices.iterator().next().getExecutionProperties();
-      final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
-      final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
-
-      // Only one DAG will be created and reused.
-      final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
-      // Collect split source readables in advance and bind to each irvertex to avoid extra source split.
-      final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
-      for (int i = 0; i < stageParallelism; i++) {
-        vertexIdToReadables.add(new HashMap<>());
+  private void integrityCheck(final List<IRVertex> stageVertices) {
+    final IRVertex firstVertex = stageVertices.get(0);
+    final String placement = firstVertex.getProperty(ExecutionProperty.Key.ExecutorPlacement);
+    final int scheduleGroup = firstVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
+    final int parallelism = firstVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism);
+
+    stageVertices.forEach(irVertex -> {
+      // Check vertex type.
+      if (!(irVertex instanceof  SourceVertex
+          || irVertex instanceof OperatorVertex
+          || irVertex instanceof MetricCollectionBarrierVertex)) {
+        throw new UnsupportedOperationException(irVertex.toString());
       }
 
-      // Iterate over the vertices contained in this stage
-      stageVertices.forEach(irVertex -> {
-        if (!(irVertex instanceof  SourceVertex
-            || irVertex instanceof OperatorVertex
-            || irVertex instanceof MetricCollectionBarrierVertex)) {
-          // Sanity check
-          throw new IllegalStateException(irVertex.toString());
-        }
-
-        if (irVertex instanceof SourceVertex) {
-          final SourceVertex sourceVertex = (SourceVertex) irVertex;
-
-          // Take care of the Readables
-          try {
-            final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
-            for (int i = 0; i < stageParallelism; i++) {
-              vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
-            }
-          } catch (Exception e) {
-            throw new PhysicalPlanGenerationException(e);
-          }
-
-          // Clear internal metadata
-          sourceVertex.clearInternalStates();
-        }
-
-        stageInternalDAGBuilder.addVertex(irVertex);
-        idToIRVertex.put(irVertex.getId(), irVertex);
-      });
-
-      // connect internal edges in the stage. It suffices to iterate over only the stage internal inEdges.
-      final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
-      stageInternalDAG.getVertices().forEach(irVertex -> {
-        final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
-        inEdges.forEach(edge ->
-            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
-                edge.getId(),
-                edge.getExecutionProperties(),
-                edge.getSrc(),
-                edge.getDst(),
-                edge.getCoder(),
-                edge.isSideInput())));
-      });
-
-      // Create the physical stage.
-      final PhysicalStage physicalStage =
-          new PhysicalStage(stage.getId(), stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
-              stageParallelism, stage.getScheduleGroupIndex(), containerType, vertexIdToReadables);
-
-      physicalDAGBuilder.addVertex(physicalStage);
-      runtimeStageIdToPhysicalStageMap.put(stage.getId(), physicalStage);
-    }
-
-    // Connect Physical stages
-    dagOfStages.getVertices().forEach(stage ->
-        dagOfStages.getIncomingEdgesOf(stage).forEach(stageEdge -> {
-          final PhysicalStage srcStage = runtimeStageIdToPhysicalStageMap.get(stageEdge.getSrc().getId());
-          final PhysicalStage dstStage = runtimeStageIdToPhysicalStageMap.get(stageEdge.getDst().getId());
-
-          physicalDAGBuilder.connectVertices(new PhysicalStageEdge(stageEdge.getId(),
-              stageEdge.getExecutionProperties(),
-              stageEdge.getSrcVertex(),
-              stageEdge.getDstVertex(),
-              srcStage, dstStage,
-              stageEdge.getCoder(),
-              stageEdge.isSideInput()));
-        }));
-
-    return physicalDAGBuilder.build();
+      // Check execution properties.
+      if ((placement != null
+          && !placement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
+          || scheduleGroup != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)
+          || parallelism != irVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism)) {
+        throw new RuntimeException("Vertices of the same stage have different execution properties: "
+            + irVertex.getId());
+      }
+    });
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
similarity index 86%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index a58cfba4..138dbb94 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -13,14 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.commons.lang3.SerializationUtils;
 
 import java.util.ArrayList;
@@ -28,9 +27,9 @@
 import java.util.Map;
 
 /**
- * PhysicalStage.
+ * Stage.
  */
-public final class PhysicalStage extends Vertex {
+public final class Stage extends Vertex {
   private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
   private final int parallelism;
   private final int scheduleGroupIndex;
@@ -48,12 +47,12 @@
    * @param containerType       the type of container to execute the task on.
    * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
    */
-  public PhysicalStage(final String stageId,
-                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
-                       final int parallelism,
-                       final int scheduleGroupIndex,
-                       final String containerType,
-                       final List<Map<String, Readable>> vertexIdToReadables) {
+  public Stage(final String stageId,
+               final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
+               final int parallelism,
+               final int scheduleGroupIndex,
+               final String containerType,
+               final List<Map<String, Readable>> vertexIdToReadables) {
     super(stageId);
     this.irDag = irDag;
     this.parallelism = parallelism;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
similarity index 52%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
index 3566948f..9913fa9e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
@@ -13,93 +13,88 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.stage;
+package edu.snu.nemo.runtime.common.plan;
 
-import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.common.dag.DAGBuilder;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Stage Builder.
  */
-public final class StageBuilder {
-  private final DAGBuilder<IRVertex, IREdge> stageInternalDAGBuilder;
+final class StageBuilder {
+  private final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder;
   private final Integer stageId;
+  private final int parallelism;
   private final int scheduleGroupIndex;
+  private final String containerType;
+  private List<Map<String, Readable>> vertexIdToReadables;
 
   /**
    * Builds a {@link Stage}.
    * @param stageId stage ID in numeric form.
    * @param scheduleGroupIndex indicating its scheduling order.
    */
-  public StageBuilder(final Integer stageId,
-                      final int scheduleGroupIndex) {
+  StageBuilder(final Integer stageId,
+               final int parallelism,
+               final int scheduleGroupIndex,
+               final String containerType) {
     this.stageId = stageId;
+    this.parallelism = parallelism;
     this.scheduleGroupIndex = scheduleGroupIndex;
+    this.containerType = containerType;
     this.stageInternalDAGBuilder = new DAGBuilder<>();
+    this.vertexIdToReadables = new ArrayList<>(1);
   }
 
-  /**
-   */
   /**
    * Adds a {@link IRVertex} to this stage.
    * @param vertex to add.
    * @return the stageBuilder.
    */
-  public StageBuilder addVertex(final IRVertex vertex) {
+  StageBuilder addVertex(final IRVertex vertex) {
     stageInternalDAGBuilder.addVertex(vertex);
-    return this;
-  }
+    return this; }
 
   /**
    * Connects two {@link IRVertex} in this stage.
    * @param edge the IREdge that connects vertices.
    * @return the stageBuilder.
    */
-  public StageBuilder connectInternalVertices(final IREdge edge) {
+  StageBuilder connectInternalVertices(final RuntimeEdge<IRVertex> edge) {
     stageInternalDAGBuilder.connectVertices(edge);
     return this;
   }
 
-  /**
-   * @return true if this builder doesn't contain any valid {@link IRVertex}.
-   */
-  public boolean isEmpty() {
-    return stageInternalDAGBuilder.isEmpty();
+  StageBuilder addReadables(final List<Map<String, Readable>> vertexIdToReadable) {
+    this.vertexIdToReadables = vertexIdToReadable;
+    return this;
   }
 
   /**
-   * Integrity check for stages.
-   * @param stage stage to check for.
+   * @return true if this builder doesn't contain any valid {@link IRVertex}.
    */
-  private void integrityCheck(final Stage stage) {
-    final List<IRVertex> vertices = stage.getStageInternalDAG().getVertices();
-
-    final String firstPlacement = vertices.iterator().next().getProperty(ExecutionProperty.Key.ExecutorPlacement);
-    final int scheduleGroupIdx =
-        vertices.iterator().next().<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
-    vertices.forEach(irVertex -> {
-      if ((firstPlacement != null
-          && !firstPlacement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
-          || scheduleGroupIdx != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)) {
-        throw new RuntimeException("Vertices of the same stage have different execution properties: "
-            + irVertex.getId());
-      }
-    });
+  boolean isEmpty() {
+    return stageInternalDAGBuilder.isEmpty();
   }
 
   /**
    * Builds and returns the {@link Stage}.
    * @return the runtime stage.
    */
-  public Stage build() {
-    final Stage stage = new Stage(RuntimeIdGenerator.generateStageId(stageId),
-        stageInternalDAGBuilder.buildWithoutSourceSinkCheck(), scheduleGroupIndex);
-    integrityCheck(stage);
+  Stage build() {
+    final Stage stage = new Stage(
+        RuntimeIdGenerator.generateStageId(stageId),
+        stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
+        parallelism,
+        scheduleGroupIndex,
+        containerType,
+        vertexIdToReadables);
     return stage;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
similarity index 79%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 13c83e85..3b8cdf01 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -13,22 +13,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.data.HashRange;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Contains information stage boundary {@link edu.snu.nemo.runtime.common.plan.stage.StageEdge}.
+ * Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
+ * This means that there can be multiple StageEdges between two Stages.
  */
-public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
+public final class StageEdge extends RuntimeEdge<Stage> {
   /**
    * The source {@link IRVertex}.
    * This belongs to the srcStage.
@@ -50,21 +50,21 @@
    * Constructor.
    * @param runtimeEdgeId id of the runtime edge.
    * @param edgeProperties edge execution properties.
-   * @param srcVertex source vertex.
-   * @param dstVertex destination vertex.
+   * @param srcVertex source IRVertex in the srcStage of this edge.
+   * @param dstVertex destination IRVertex in the dstStage of this edge.
    * @param srcStage source stage.
    * @param dstStage destination stage.
    * @param coder the coder for enconding and deconding.
    * @param isSideInput whether or not the edge is a sideInput edge.
    */
-  public PhysicalStageEdge(final String runtimeEdgeId,
-                           final ExecutionPropertyMap edgeProperties,
-                           final IRVertex srcVertex,
-                           final IRVertex dstVertex,
-                           final PhysicalStage srcStage,
-                           final PhysicalStage dstStage,
-                           final Coder coder,
-                           final Boolean isSideInput) {
+  public StageEdge(final String runtimeEdgeId,
+                   final ExecutionPropertyMap edgeProperties,
+                   final IRVertex srcVertex,
+                   final IRVertex dstVertex,
+                   final Stage srcStage,
+                   final Stage dstStage,
+                   final Coder coder,
+                   final Boolean isSideInput) {
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, coder, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
similarity index 92%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
index cf5d3cc7..b7448a1c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.stage;
+package edu.snu.nemo.runtime.common.plan;
 
 
 import edu.snu.nemo.common.coder.Coder;
@@ -37,7 +37,7 @@
    * Represents the edge between vertices in a logical plan.
    * @param irEdgeId id of this edge.
    */
-  public StageEdgeBuilder(final String irEdgeId) {
+  StageEdgeBuilder(final String irEdgeId) {
     this.stageEdgeId = irEdgeId;
   }
 
@@ -112,7 +112,6 @@ public StageEdgeBuilder setSideInputFlag(final Boolean sideInputFlag) {
   }
 
   public StageEdge build() {
-    return new StageEdge(stageEdgeId,
-        edgeProperties, srcStage, dstStage, coder, isSideInput, srcVertex, dstVertex);
+    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, coder, isSideInput);
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java
deleted file mode 100644
index f56bdef1..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.stage;
-
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.Vertex;
-
-/**
- * Represents a stage in Runtime's execution of a job.
- * Each stage contains a part of a whole execution plan.
- * Stage partitioning is determined by {edu.snu.nemo.compiler.backend.nemo.NemoBackend}.
- */
-public final class Stage extends Vertex {
-  private final DAG<IRVertex, IREdge> stageInternalDAG;
-  private final int scheduleGroupIndex;
-
-  /**
-   * Constructor.
-   * @param stageId id of the stage.
-   * @param stageInternalDAG the internal DAG of the stage.
-   * @param scheduleGroupIndex the schedule group index.
-   */
-  public Stage(final String stageId,
-               final DAG<IRVertex, IREdge> stageInternalDAG,
-               final int scheduleGroupIndex) {
-    super(stageId);
-    this.stageInternalDAG = stageInternalDAG;
-    this.scheduleGroupIndex = scheduleGroupIndex;
-  }
-
-  /**
-   * @return the internal DAG of the stage.
-   */
-  public DAG<IRVertex, IREdge> getStageInternalDAG() {
-    return stageInternalDAG;
-  }
-
-  /**
-   * @return the schedule group index.
-   */
-  public int getScheduleGroupIndex() {
-    return scheduleGroupIndex;
-  }
-
-  @Override
-  public String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
-    sb.append(", \"stageInternalDAG\": ").append(stageInternalDAG.toString());
-    sb.append("}");
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java
deleted file mode 100644
index 5d91959f..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.stage;
-
-
-import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-
-/**
- * Stage Edge.
- */
-public final class StageEdge extends RuntimeEdge<Stage> {
-  private final IRVertex srcVertex;
-  private final IRVertex dstVertex;
-
-  /**
-   * Represents the edge between stages.
-   * @param irEdgeId id of this edge.
-   * @param edgeProperties to control the data flow on this edge.
-   * @param srcStage source runtime stage.
-   * @param dstStage destination runtime stage.
-   * @param coder coder.
-   * @param isSideInput flag for whether or not the edge is a sideInput.
-   * @param srcVertex source vertex (in srcStage).
-   * @param dstVertex destination vertex (in dstStage).
-   */
-  public StageEdge(final String irEdgeId,
-                   final ExecutionPropertyMap edgeProperties,
-                   final Stage srcStage,
-                   final Stage dstStage,
-                   final Coder coder,
-                   final Boolean isSideInput,
-                   final IRVertex srcVertex,
-                   final IRVertex dstVertex) {
-    super(RuntimeIdGenerator.generateStageEdgeId(irEdgeId), edgeProperties, srcStage, dstStage, coder, isSideInput);
-    this.srcVertex = srcVertex;
-    this.dstVertex = dstVertex;
-  }
-
-  /**
-   * @return the source vertex.
-   */
-  public IRVertex getSrcVertex() {
-    return srcVertex;
-  }
-
-  /**
-   * @return the destination vertex.
-   */
-  public IRVertex getDstVertex() {
-    return dstVertex;
-  }
-
-  @Override
-  public String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{\"runtimeEdgeId\": \"").append(getId());
-    sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
-    sb.append(", \"srcVertex\": \"").append(srcVertex.getId());
-    sb.append("\", \"dstVertex\": \"").append(dstVertex.getId());
-    sb.append("\", \"coder\": \"").append(getCoder().toString());
-    sb.append("\"}");
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
index 81286c13..41a2d0e0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
+ * Represents the states and their transitions of a physical plan.
  */
 public final class JobState {
   private final StateMachine stateMachine;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index 6e2a3d5c..114275e2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalStage}.
+ * Represents the states and their transitions of a stage.
  */
 public final class StageState {
   private final StateMachine stateMachine;
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 78896df6..a5b68300 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -26,7 +26,7 @@
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.commons.lang3.SerializationUtils;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 7a7b5231..88729909 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -28,7 +28,7 @@
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import org.apache.commons.lang3.SerializationUtils;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index 2349ab76..09a43cc2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -23,8 +23,9 @@
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.*;
@@ -50,8 +51,8 @@
   private final String taskId;
   private final int taskIdx;
   private final TaskStateManager taskStateManager;
-  private final List<PhysicalStageEdge> stageIncomingEdges;
-  private final List<PhysicalStageEdge> stageOutgoingEdges;
+  private final List<StageEdge> stageIncomingEdges;
+  private final List<StageEdge> stageOutgoingEdges;
   private Map<String, Readable> irVertexIdToReadable;
 
   // Other parameters
@@ -140,8 +141,8 @@ private void initialize() {
     // 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
     // to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
     irVertexDag.topologicalDo(irVertex -> {
-      final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
-      final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
+      final Set<StageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
+      final Set<StageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
       final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
 
       // Set data handlers of children irVertices.
@@ -152,9 +153,9 @@ private void initialize() {
       dataHandler.setChildrenDataHandler(childrenDataHandlers);
 
       // Add InputReaders for inter-stage data transfer
-      inEdgesFromOtherStages.forEach(physicalStageEdge -> {
+      inEdgesFromOtherStages.forEach(stageEdge -> {
         final InputReader inputReader = channelFactory.createReader(
-            taskIdx, physicalStageEdge.getSrcVertex(), physicalStageEdge);
+            taskIdx, stageEdge.getSrcVertex(), stageEdge);
 
         // For InputReaders that have side input, collect them separately.
         if (inputReader.isSideInputReader()) {
@@ -166,9 +167,9 @@ private void initialize() {
       });
 
       // Add OutputWriters for inter-stage data transfer
-      outEdgesToOtherStages.forEach(physicalStageEdge -> {
+      outEdgesToOtherStages.forEach(stageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
-            irVertex, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+            irVertex, taskIdx, stageEdge.getDstVertex(), stageEdge);
         dataHandler.addOutputWriter(outputWriter);
       });
 
@@ -206,7 +207,7 @@ private void initialize() {
    * @param irVertex the IRVertex whose inter-stage incoming edges to be collected.
    * @return the collected incoming edges.
    */
-  private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
+  private Set<StageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
     return stageIncomingEdges.stream().filter(
         stageInEdge -> stageInEdge.getDstVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
@@ -218,7 +219,7 @@ private void initialize() {
    * @param irVertex the IRVertex whose inter-stage outgoing edges to be collected.
    * @return the collected outgoing edges.
    */
-  private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
+  private Set<StageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
     return stageOutgoingEdges.stream().filter(
         stageInEdge -> stageInEdge.getSrcVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
@@ -431,8 +432,8 @@ private void sideInputFromOtherStages(final IRVertex irVertex, final Map<Transfo
         final Object sideInput = getSideInput(sideInputIterator);
         final RuntimeEdge inEdge = sideInputReader.getRuntimeEdge();
         final Transform srcTransform;
-        if (inEdge instanceof PhysicalStageEdge) {
-          srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
+        if (inEdge instanceof StageEdge) {
+          srcTransform = ((OperatorVertex) ((StageEdge) inEdge).getSrcVertex()).getTransform();
         } else {
           srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
         }
@@ -473,8 +474,8 @@ private void sideInputFromThisStage(final IRVertex irVertex, final Map<Transform
       Object sideInput = input.remove();
       final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
       final Transform srcTransform;
-      if (inEdge instanceof PhysicalStageEdge) {
-        srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
+      if (inEdge instanceof StageEdge) {
+        srcTransform = ((OperatorVertex) ((StageEdge) inEdge).getSrcVertex()).getTransform();
       } else {
         srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
       }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index e2a31d6f..7b1bc52f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 
 import java.util.*;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 3ca24028..a01e58c4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -24,7 +24,7 @@
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
 import edu.snu.nemo.runtime.common.data.HashRange;
@@ -113,9 +113,9 @@ public InputReader(final int dstTaskIndex,
    * @return the list of the completable future of the data.
    */
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
-    assert (runtimeEdge instanceof PhysicalStageEdge);
+    assert (runtimeEdge instanceof StageEdge);
     final KeyRange hashRangeToRead =
-        ((PhysicalStageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+        ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 49401aa3..b758fa86 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -23,8 +23,10 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
 
@@ -136,10 +138,10 @@ private void initializeComputationStates() {
     onJobStateChanged(JobState.State.EXECUTING);
 
     // Initialize the states for the job down to task-level.
-    physicalPlan.getStageDAG().topologicalDo(physicalStage -> {
-      currentJobStageIds.add(physicalStage.getId());
-      idToStageStates.put(physicalStage.getId(), new StageState());
-      physicalStage.getTaskIds().forEach(taskId -> {
+    physicalPlan.getStageDAG().topologicalDo(stage -> {
+      currentJobStageIds.add(stage.getId());
+      idToStageStates.put(stage.getId(), new StageState());
+      stage.getTaskIds().forEach(taskId -> {
         idToTaskStates.put(taskId, new TaskState());
         taskIdToCurrentAttempt.put(taskId, 1);
       });
@@ -147,23 +149,23 @@ private void initializeComputationStates() {
   }
 
   private void initializePartitionStates(final BlockManagerMaster blockManagerMaster) {
-    final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = physicalPlan.getStageDAG();
-    stageDAG.topologicalDo(physicalStage -> {
-      final List<String> taskIdsForStage = physicalStage.getTaskIds();
-      final List<PhysicalStageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(physicalStage);
+    final DAG<Stage, StageEdge> stageDAG = physicalPlan.getStageDAG();
+    stageDAG.topologicalDo(stage -> {
+      final List<String> taskIdsForStage = stage.getTaskIds();
+      final List<StageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(stage);
 
       // Initialize states for blocks of inter-stage edges
-      stageOutgoingEdges.forEach(physicalStageEdge -> {
+      stageOutgoingEdges.forEach(stageEdge -> {
         final int srcParallelism = taskIdsForStage.size();
         IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
-          final String blockId = RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(), srcTaskIdx);
+          final String blockId = RuntimeIdGenerator.generateBlockId(stageEdge.getId(), srcTaskIdx);
           blockManagerMaster.initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
         });
       });
 
       // Initialize states for blocks of stage internal edges
       taskIdsForStage.forEach(taskId -> {
-        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = physicalStage.getIRDAG();
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = stage.getIRDAG();
         taskInternalDag.getVertices().forEach(task -> {
           final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
           internalOutgoingEdges.forEach(taskRuntimeEdge -> {
@@ -240,7 +242,7 @@ public synchronized void onStageStateChanged(final String stageId, final StageSt
       // if there exists a mapping, this state change is from a failed_recoverable stage,
       // and there may be tasks that do not need to be re-executed.
       if (!stageIdToRemainingTaskSet.containsKey(stageId)) {
-        for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
+        for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
           if (stage.getId().equals(stageId)) {
             Set<String> remainingTaskIds = new HashSet<>();
             remainingTaskIds.addAll(
@@ -510,9 +512,9 @@ public String toStringWithPhysicalPlan() {
   public synchronized String toString() {
     final StringBuilder sb = new StringBuilder("{");
     sb.append("\"jobId\": \"").append(jobId).append("\", ");
-    sb.append("\"physicalStages\": [");
+    sb.append("\"stages\": [");
     boolean isFirstStage = true;
-    for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
       if (!isFirstStage) {
         sb.append(", ");
       }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index d2b709f4..4751573a 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -24,7 +24,7 @@
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 30356606..2ddb8187 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.CompilerEventHandler;
 import edu.snu.nemo.runtime.common.eventhandler.UpdatePhysicalPlanEvent;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index 8536746e..f73ecdb1 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index f32a4416..c51d1ec3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -22,12 +22,11 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
+import edu.snu.nemo.runtime.common.plan.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +111,7 @@ public void scheduleJob(final PhysicalPlan jobToSchedule, final JobStateManager
     LOG.info("Job to schedule: {}", jobToSchedule.getId());
 
     this.initialScheduleGroup = jobToSchedule.getStageDAG().getVertices().stream()
-        .mapToInt(physicalStage -> physicalStage.getScheduleGroupIndex())
+        .mapToInt(stage -> stage.getScheduleGroupIndex())
         .min().getAsInt();
 
     scheduleRootStages();
@@ -226,9 +225,9 @@ public void terminate() {
    * Schedule stages in initial schedule group, in reverse-topological order.
    */
   private void scheduleRootStages() {
-    final List<PhysicalStage> rootStages =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
-            physicalStage.getScheduleGroupIndex() == initialScheduleGroup)
+    final List<Stage> rootStages =
+        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
+            stage.getScheduleGroupIndex() == initialScheduleGroup)
             .collect(Collectors.toList());
     Collections.reverse(rootStages);
     rootStages.forEach(this::scheduleStage);
@@ -239,8 +238,8 @@ private void scheduleRootStages() {
    * @param completedStageId the ID of the stage that just completed and triggered this scheduling.
    */
   private void scheduleNextStage(final String completedStageId) {
-    final PhysicalStage completeOrFailedStage = getStageById(completedStageId);
-    final Optional<List<PhysicalStage>> nextStagesToSchedule =
+    final Stage completeOrFailedStage = getStageById(completedStageId);
+    final Optional<List<Stage>> nextStagesToSchedule =
         selectNextStagesToSchedule(completeOrFailedStage.getScheduleGroupIndex());
 
     if (nextStagesToSchedule.isPresent()) {
@@ -271,9 +270,9 @@ private void scheduleNextStage(final String completedStageId) {
    * @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
    * enqueued to {@link PendingTaskCollection}.
    */
-  private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
+  private Optional<List<Stage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
     if (currentScheduleGroupIndex > initialScheduleGroup) {
-      final Optional<List<PhysicalStage>> ancestorStagesFromAScheduleGroup =
+      final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
           selectNextStagesToSchedule(currentScheduleGroupIndex - 1);
       if (ancestorStagesFromAScheduleGroup.isPresent()) {
         return ancestorStagesFromAScheduleGroup;
@@ -281,15 +280,15 @@ private void scheduleNextStage(final String completedStageId) {
     }
 
     // All previous schedule groups are complete, we need to check for the current schedule group.
-    final List<PhysicalStage> currentScheduleGroup =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
-            physicalStage.getScheduleGroupIndex() == currentScheduleGroupIndex)
+    final List<Stage> currentScheduleGroup =
+        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
+            stage.getScheduleGroupIndex() == currentScheduleGroupIndex)
             .collect(Collectors.toList());
-    List<PhysicalStage> stagesToSchedule = new LinkedList<>();
+    List<Stage> stagesToSchedule = new LinkedList<>();
     boolean allStagesComplete = true;
 
     // We need to reschedule failed_recoverable stages.
-    for (final PhysicalStage stageToCheck : currentScheduleGroup) {
+    for (final Stage stageToCheck : currentScheduleGroup) {
       final StageState.State stageState =
           (StageState.State) jobStateManager.getStageState(stageToCheck.getId()).getStateMachine().getCurrentState();
       switch (stageState) {
@@ -313,9 +312,9 @@ private void scheduleNextStage(final String completedStageId) {
     // By the time the control flow has reached here,
     // we are ready to move onto the next ScheduleGroup
     stagesToSchedule =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage -> {
-          if (physicalStage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
-            final String stageId = physicalStage.getId();
+        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage -> {
+          if (stage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
+            final String stageId = stage.getId();
             return jobStateManager.getStageState(stageId).getStateMachine().getCurrentState()
                 != StageState.State.EXECUTING
                 && jobStateManager.getStageState(stageId).getStateMachine().getCurrentState()
@@ -341,10 +340,10 @@ private void scheduleNextStage(final String completedStageId) {
    * It adds the list of tasks for the stage where the scheduler thread continuously polls from.
    * @param stageToSchedule the stage to schedule.
    */
-  private void scheduleStage(final PhysicalStage stageToSchedule) {
-    final List<PhysicalStageEdge> stageIncomingEdges =
+  private void scheduleStage(final Stage stageToSchedule) {
+    final List<StageEdge> stageIncomingEdges =
         physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
-    final List<PhysicalStageEdge> stageOutgoingEdges =
+    final List<StageEdge> stageOutgoingEdges =
         physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
     final Enum stageState = jobStateManager.getStageState(stageToSchedule.getId()).getStateMachine().getCurrentState();
@@ -419,18 +418,18 @@ private void scheduleStage(final PhysicalStage stageToSchedule) {
    * @return the IR dag
    */
   private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
-    for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
-      if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return physicalStage.getIRDAG();
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+      if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
+        return stage.getIRDAG();
       }
     }
     throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
   }
 
-  private PhysicalStage getStageById(final String stageId) {
-    for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
-      if (physicalStage.getId().equals(stageId)) {
-        return physicalStage;
+  private Stage getStageById(final String stageId) {
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+      if (stage.getId().equals(stageId)) {
+        return stage;
       }
     }
     throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
@@ -538,7 +537,7 @@ private void onTaskExecutionFailedRecoverable(final String executorId,
       case INPUT_READ_FAILURE:
         jobStateManager.onTaskStateChanged(taskId, newState);
         LOG.info("All tasks of {} will be made failed_recoverable.", stageId);
-        for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
+        for (final Stage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
           if (stage.getId().equals(stageId)) {
             LOG.info("Removing Tasks for {} before they are scheduled to an executor", stage.getId());
             pendingTaskCollection.removeTasksAndDescendants(stage.getId());
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index ab5081cf..66fa9869 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index f23a1f09..46b3c6b1 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index e72f486c..5d19f52d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 361307ff..1d8c8a2e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -15,8 +15,8 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index a232a799..afdc6718 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index d34eb568..6e31269f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index f411c1da..aa537808 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 1e71882b..7ea05fbc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index 0375e01e..fae45657 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -17,10 +17,10 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -147,7 +147,7 @@ private synchronized void removeStageAndChildren(final String stageId) {
     }
 
     physicalPlan.getStageDAG().getChildren(stageId).forEach(
-        physicalStage -> removeStageAndChildren(physicalStage.getId()));
+        stage -> removeStageAndChildren(stage.getId()));
   }
 
   /**
@@ -161,7 +161,7 @@ private synchronized void removeStageAndChildren(final String stageId) {
    */
   private synchronized void updateSchedulableStages(
       final String candidateStageId, final String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
+    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
 
     if (isSchedulable(candidateStageId, candidateStageContainerType)) {
       // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
@@ -187,8 +187,8 @@ private synchronized void updateSchedulableStages(
    * @return true if schedulable, false otherwise.
    */
   private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
-    for (final PhysicalStage descendantStage : jobDAG.getDescendants(candidateStageId)) {
+    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
+    for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
       if (schedulableStages.contains(descendantStage.getId())) {
         if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
           return false;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index f7056bff..bee0a365 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index af8371b2..ee2cfa0a 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -82,7 +82,7 @@ private static void checkPendingFuture(final Future<String> future) {
    */
   @Test
   public void testLostAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-0");
+    final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-0");
     final int srcTaskIndex = 0;
     final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-test");
     final String executorId = RuntimeIdGenerator.generateExecutorId();
@@ -115,7 +115,7 @@ public void testLostAfterCommit() throws Exception {
    */
   @Test
   public void testBeforeAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-1");
+    final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-1");
     final int srcTaskIndex = 0;
     final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-Test");
     final String executorId = RuntimeIdGenerator.generateExecutorId();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index 2f5473b4..5d515a7b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -22,7 +22,10 @@
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
@@ -85,17 +88,17 @@ public void testPhysicalPlanStateChanges() throws Exception {
 
     assertEquals(jobStateManager.getJobId(), "TestPlan");
 
-    final List<PhysicalStage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
 
     for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) {
-      final PhysicalStage physicalStage = stageList.get(stageIdx);
-      jobStateManager.onStageStateChanged(physicalStage.getId(), StageState.State.EXECUTING);
-      final List<String> taskIds = physicalStage.getTaskIds();
+      final Stage stage = stageList.get(stageIdx);
+      jobStateManager.onStageStateChanged(stage.getId(), StageState.State.EXECUTING);
+      final List<String> taskIds = stage.getTaskIds();
       taskIds.forEach(taskId -> {
         jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
         jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
         if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 1) {
-          assertTrue(jobStateManager.checkStageCompletion(physicalStage.getId()));
+          assertTrue(jobStateManager.checkStageCompletion(stage.getId()));
         }
       });
       final Map<String, TaskState> taskStateMap = jobStateManager.getIdToTaskStates();
@@ -117,7 +120,7 @@ public void testPhysicalPlanStateChanges() throws Exception {
   public void testWaitUntilFinish() {
     // Create a JobStateManager of an empty dag.
     final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
         new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index 78466b27..308f3b5f 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -20,7 +20,9 @@
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
@@ -152,19 +154,19 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje
     // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
     for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
-      final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
+      final List<Stage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
-      stages.forEach(physicalStage -> {
-        while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
+      stages.forEach(stage -> {
+        while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
             != StageState.State.EXECUTING) {
 
         }
       });
 
-      stages.forEach(physicalStage -> {
+      stages.forEach(stage -> {
         SchedulerTestUtil.completeStage(
-            jobStateManager, scheduler, executorRegistry, physicalStage, SCHEDULE_ATTEMPT_INDEX);
+            jobStateManager, scheduler, executorRegistry, stage, SCHEDULE_ATTEMPT_INDEX);
       });
     }
 
@@ -174,13 +176,13 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje
     assertTrue(jobStateManager.checkJobTermination());
   }
 
-  private List<PhysicalStage> filterStagesWithAScheduleGroupIndex(
-      final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG, final int scheduleGroupIndex) {
-    final Set<PhysicalStage> stageSet = new HashSet<>(physicalDAG.filterVertices(
-        physicalStage -> physicalStage.getScheduleGroupIndex() == scheduleGroupIndex));
+  private List<Stage> filterStagesWithAScheduleGroupIndex(
+      final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) {
+    final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices(
+        stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex));
 
     // Return the filtered vertices as a sorted list
-    final List<PhysicalStage> sortedStages = new ArrayList<>(stageSet.size());
+    final List<Stage> sortedStages = new ArrayList<>(stageSet.size());
     physicalDAG.topologicalDo(stage -> {
       if (stageSet.contains(stage)) {
         sortedStages.add(stage);
@@ -189,7 +191,7 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje
     return sortedStages;
   }
 
-  private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG) {
+  private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) {
     final Set<Integer> scheduleGroupSet = new HashSet<>();
     physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex()));
     return scheduleGroupSet.size();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 31504750..1b7768f9 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index 5b483157..f4ac23ef 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -19,7 +19,8 @@
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
@@ -132,9 +133,9 @@ public void testContainerRemoval() throws Exception {
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -203,9 +204,9 @@ public void testOutputFailure() throws Exception {
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -269,9 +270,9 @@ public void testInputReadFailure() throws Exception {
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -324,13 +325,13 @@ public void testTaskReexecutionForFailure() throws Exception {
     scheduler.scheduleJob(plan, jobStateManager);
 
     final List<ExecutorRepresenter> executors = new ArrayList<>();
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     int executorIdIndex = 1;
     float removalChance = 0.7f; // Out of 1.0
     final Random random = new Random(0); // Deterministic seed.
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
 
       while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != COMPLETE) {
         // By chance, remove or add executor
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
index 0071b9f0..ac77c46c 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 5edd885e..d40859bc 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index a9d1d4b2..644fb381 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -32,21 +32,21 @@
    * @param jobStateManager for the submitted job.
    * @param scheduler for the submitted job.
    * @param executorRegistry provides executor representers
-   * @param physicalStage for which the states should be marked as complete.
+   * @param stage for which the states should be marked as complete.
    */
   static void completeStage(final JobStateManager jobStateManager,
                             final Scheduler scheduler,
                             final ExecutorRegistry executorRegistry,
-                            final PhysicalStage physicalStage,
+                            final Stage stage,
                             final int attemptIdx) {
     // Loop until the stage completes.
     while (true) {
-      final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
+      final Enum stageState = jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState();
       if (StageState.State.COMPLETE == stageState) {
         // Stage has completed, so we break out of the loop.
         break;
       } else if (StageState.State.EXECUTING == stageState) {
-        physicalStage.getTaskIds().forEach(taskId -> {
+        stage.getTaskIds().forEach(taskId -> {
           final Enum tgState = jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState();
           if (TaskState.State.EXECUTING == tgState) {
             sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 0ae5998b..921dfe8c 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -16,7 +16,9 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +61,7 @@ public void testPushPriority() throws Exception {
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, true);
 
     pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -115,7 +117,7 @@ public void testPullPriority() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, false);
     pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
@@ -168,7 +170,7 @@ public void testWithDifferentContainerType() throws Exception {
     final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
         TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, true);
     pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -206,10 +208,10 @@ public void testWithDifferentContainerType() throws Exception {
   }
 
   /**
-   * Schedule the tasks in a physical stage.
+   * Schedule the tasks in a stage.
    * @param stage the stage to schedule.
    */
-  private void scheduleStage(final PhysicalStage stage) {
+  private void scheduleStage(final Stage stage) {
     stage.getTaskIds().forEach(taskId ->
         pendingTaskPriorityQueue.add(new ExecutableTask(
             "TestPlan",
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 65f9d984..4263e7f2 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index f9d20bd1..9d4142d7 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -31,10 +31,10 @@
 import edu.snu.nemo.compiler.optimizer.policy.BasicPushPolicy;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
@@ -100,7 +100,7 @@ public static PhysicalPlan generatePhysicalPlan(final PlanType planType, final b
   private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDAG,
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
+    final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
     return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index f2579262..fe699425 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -26,10 +26,10 @@
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -74,7 +74,7 @@ public void testState() throws Exception {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
     final PhysicalPlanGenerator physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
 
     final LocalMessageDispatcher messageDispatcher = new LocalMessageDispatcher();
     final LocalMessageEnvironment messageEnvironment =
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index 68f9b12f..ebbc7b6c 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -27,8 +27,8 @@
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.compiler.optimizer.policy.PadoPolicy;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index 611d821e..f948589f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -32,11 +32,9 @@
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.stage.Stage;
-import edu.snu.nemo.runtime.common.plan.stage.StageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -80,7 +78,7 @@ public void testSimplePlan() throws Exception {
     final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
         new TestPolicy(), "");
     final DAG<Stage, StageEdge> DAGOfStages = physicalPlanGenerator.stagePartitionIrDAG(irDAG);
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
 
     // Test DAG of stages
     final List<Stage> sortedDAGOfStages = DAGOfStages.getTopologicalSort();
@@ -94,9 +92,9 @@ public void testSimplePlan() throws Exception {
     assertEquals(DAGOfStages.getOutgoingEdgesOf(stage2).size(), 0);
 
     // Test Physical DAG
-    final List<PhysicalStage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
-    final PhysicalStage physicalStage1 = sortedPhysicalDAG.get(0);
-    final PhysicalStage physicalStage2 = sortedPhysicalDAG.get(1);
+    final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
+    final Stage physicalStage1 = sortedPhysicalDAG.get(0);
+    final Stage physicalStage2 = sortedPhysicalDAG.get(1);
     assertEquals(physicalDAG.getVertices().size(), 2);
     assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage1).size(), 0);
     assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage2).size(), 1);
@@ -238,10 +236,10 @@ public void testComplexPlan() throws Exception {
 
     // Test Physical DAG
 
-//    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = logicalDAG.convert(new PhysicalDAGGenerator());
-//    final List<PhysicalStage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
-//    final PhysicalStage physicalStage1 = sortedPhysicalDAG.get(0);
-//    final PhysicalStage physicalStage2 = sortedPhysicalDAG.get(1);
+//    final DAG<Stage, StageEdge> physicalDAG = logicalDAG.convert(new PhysicalDAGGenerator());
+//    final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
+//    final Stage physicalStage1 = sortedPhysicalDAG.get(0);
+//    final Stage physicalStage2 = sortedPhysicalDAG.get(1);
 //    assertEquals(physicalDAG.getVertices().size(), 2);
 //    assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage1).size(), 0);
 //    assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage2).size(), 1);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index a6df1d45..a946b4bb 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -27,8 +27,9 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
 import edu.snu.nemo.runtime.executor.TaskExecutor;
 import edu.snu.nemo.runtime.executor.TaskStateManager;
@@ -59,7 +60,7 @@
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
-    TaskStateManager.class, PhysicalStageEdge.class})
+    TaskStateManager.class, StageEdge.class})
 public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
   private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
@@ -112,7 +113,7 @@ public Iterable read() throws Exception {
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
         new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().addVertex(sourceIRVertex).buildWithoutSourceSinkCheck();
-    final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
+    final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
     final ExecutableTask executableTask =
@@ -165,9 +166,9 @@ public void testOperatorVertex() throws Exception {
             runtimeIREdgeId, edgeProperties, operatorIRVertex1, operatorIRVertex2, coder))
         .buildWithoutSourceSinkCheck();
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
-    final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
+    final StageEdge stageInEdge = mock(StageEdge.class);
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
-    final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
+    final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
     final ExecutableTask executableTask =
         new ExecutableTask(
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index cfdeb2fe..98b8cc0f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -124,7 +124,7 @@ public void setUp() throws Exception {
     IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
 
     // Generates the ids and the data of the blocks to be used.
-    final String shuffleEdge = RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
+    final String shuffleEdge = RuntimeIdGenerator.generateStageEdgeId("shuffle_edge");
     IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
       final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, writeTaskIdx);
@@ -146,7 +146,7 @@ public void setUp() throws Exception {
     // Following part is for the concurrent read test.
     final String writeTaskId = "conc_write_IR_vertex";
     final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
-    final String concEdge = RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
+    final String concEdge = RuntimeIdGenerator.generateStageEdgeId("conc_read_edge");
 
     // Generates the ids and the data to be used.
     concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
@@ -168,7 +168,7 @@ public void setUp() throws Exception {
     // Generates the ids of the tasks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
     IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
-    final String hashEdge = RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
+    final String hashEdge = RuntimeIdGenerator.generateStageEdgeId("hash_edge");
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 61ef9b0b..697122e9 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -37,8 +37,8 @@
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -320,9 +320,9 @@ private void writeAndRead(final BlockManagerWorker sender,
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
-    final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
-    final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+    final Stage srcStage = setupStages("srcStage-" + testIndex);
+    final Stage dstStage = setupStages("dstStage-" + testIndex);
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
         srcStage, dstStage, CODER, false);
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -407,13 +407,13 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
-    final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
-    final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+    final Stage srcStage = setupStages("srcStage-" + testIndex);
+    final Stage dstStage = setupStages("dstStage-" + testIndex);
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
         srcStage, dstStage, CODER, false);
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
-    final PhysicalStage dstStage2 = setupStages("dstStage-" + testIndex2);
-    dummyEdge2 = new PhysicalStageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
+    final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
+    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
         srcStage, dstStage2, CODER, false);
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -541,9 +541,9 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
     return Pair.of(srcVertex, dstVertex);
   }
 
-  private PhysicalStage setupStages(final String stageId) {
+  private Stage setupStages(final String stageId) {
     final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
 
-    return new PhysicalStage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
+    return new Stage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services