You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by je...@apache.org on 2018/06/05 08:03:01 UTC

[incubator-nemo] branch master updated: [NEMO-78] Rename PhysicalStage to Stage (#26)

This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new d418580  [NEMO-78] Rename PhysicalStage to Stage (#26)
d418580 is described below

commit d4185809cfc5814ed7620f4389989fc5cfa8e049
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 5 17:02:56 2018 +0900

    [NEMO-78] Rename PhysicalStage to Stage (#26)
    
    JIRA: NEMO-78: Rename PhysicalStage to Stage
    
    Major changes:
    * Remove the existing Stage that was only being used in the compiler-backend
    * Rename the existing PhysicalStage to Stage
    * Directly translate IRDAGs into Stage DAGs used in the runtime
    * Rename all related variables
    
    Minor changes to note:
    * Also in json2dot.py, 'physicalStages' is renamed to 'stages', and that appears to break our online DAG visualizer. Needs @seojangho's review on this.
    * Remove the disaggregation test which is redundant with the pado test, and change integration test parameters to use resources more efficiently
    
    Tests for the changes:
    * No new tests, as no new feature was added
    
    Other comments:
    N/A
    
    resolves NEMO-78
---
 bin/json2dot.py                                    |  18 +-
 .../nemo/compiler/backend/nemo/NemoBackend.java    |  12 +-
 .../optimizer/examples/EmptyComponents.java        |   7 +-
 .../snu/nemo/examples/beam/MapReduceITCase.java    |   8 -
 .../resources/beam_sample_executor_resources.json  |  11 +-
 .../resources/spark_sample_executor_resources.json |   9 +-
 .../nemo/runtime/common/RuntimeIdGenerator.java    |  16 +-
 .../eventhandler/DynamicOptimizationEvent.java     |   2 +-
 .../DynamicOptimizationEventHandler.java           |   2 +-
 .../eventhandler/UpdatePhysicalPlanEvent.java      |   2 +-
 .../runtime/common/optimizer/RuntimeOptimizer.java |   2 +-
 .../pass/runtime/DataSkewRuntimePass.java          |  16 +-
 .../common/optimizer/pass/runtime/RuntimePass.java |   2 +-
 .../common/plan/{physical => }/ExecutableTask.java |  14 +-
 .../common/plan/{physical => }/PhysicalPlan.java   |   8 +-
 .../plan/{physical => }/PhysicalPlanGenerator.java | 195 ++++++++-------------
 .../{physical/PhysicalStage.java => Stage.java}    |  19 +-
 .../common/plan/{stage => }/StageBuilder.java      |  71 ++++----
 .../PhysicalStageEdge.java => StageEdge.java}      |  28 +--
 .../common/plan/{stage => }/StageEdgeBuilder.java  |   7 +-
 .../snu/nemo/runtime/common/plan/stage/Stage.java  |  68 -------
 .../nemo/runtime/common/plan/stage/StageEdge.java  |  81 ---------
 .../snu/nemo/runtime/common/state/JobState.java    |   2 +-
 .../snu/nemo/runtime/common/state/StageState.java  |   2 +-
 .../edu/snu/nemo/driver/UserApplicationRunner.java |   2 +-
 .../edu/snu/nemo/runtime/executor/Executor.java    |   2 +-
 .../snu/nemo/runtime/executor/TaskExecutor.java    |  31 ++--
 .../nemo/runtime/executor/TaskStateManager.java    |   2 +-
 .../runtime/executor/datatransfer/InputReader.java |   6 +-
 .../snu/nemo/runtime/master/JobStateManager.java   |  32 ++--
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |   2 +-
 .../UpdatePhysicalPlanEventHandler.java            |   2 +-
 .../master/resource/ExecutorRepresenter.java       |   2 +-
 .../master/scheduler/BatchSingleJobScheduler.java  |  57 +++---
 .../scheduler/CompositeSchedulingPolicy.java       |   2 +-
 .../ContainerTypeAwareSchedulingPolicy.java        |   2 +-
 .../master/scheduler/FreeSlotSchedulingPolicy.java |   2 +-
 .../master/scheduler/PendingTaskCollection.java    |   4 +-
 .../scheduler/RoundRobinSchedulingPolicy.java      |   2 +-
 .../nemo/runtime/master/scheduler/Scheduler.java   |   2 +-
 .../runtime/master/scheduler/SchedulerRunner.java  |   2 +-
 .../runtime/master/scheduler/SchedulingPolicy.java |   2 +-
 .../master/scheduler/SingleJobTaskCollection.java  |  16 +-
 .../SourceLocationAwareSchedulingPolicy.java       |   2 +-
 .../runtime/master/BlockManagerMasterTest.java     |   4 +-
 .../nemo/runtime/master/JobStateManagerTest.java   |  17 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |  26 +--
 .../ContainerTypeAwareSchedulingPolicyTest.java    |   2 +-
 .../master/scheduler/FaultToleranceTest.java       |  19 +-
 .../scheduler/FreeSlotSchedulingPolicyTest.java    |   2 +-
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |   2 +-
 .../master/scheduler/SchedulerTestUtil.java        |  10 +-
 .../master/scheduler/SingleTaskQueueTest.java      |  14 +-
 .../SourceLocationAwareSchedulingPolicyTest.java   |   2 +-
 .../runtime/plangenerator/TestPlanGenerator.java   |  10 +-
 .../snu/nemo/tests/client/ClientEndpointTest.java  |  10 +-
 .../compiler/backend/nemo/NemoBackendTest.java     |   4 +-
 .../runtime/common/plan/DAGConverterTest.java      |  24 ++-
 .../tests/runtime/executor/TaskExecutorTest.java   |  11 +-
 .../runtime/executor/data/BlockStoreTest.java      |   6 +-
 .../executor/datatransfer/DataTransferTest.java    |  24 +--
 61 files changed, 374 insertions(+), 589 deletions(-)

diff --git a/bin/json2dot.py b/bin/json2dot.py
index e4c77bf..85538e7 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -48,18 +48,18 @@ class JobState:
     def __init__(self, data):
         self.id = data['jobId']
         self.stages = {}
-        for stage in data['physicalStages']:
-            self.stages[stage['id']] = PhysicalStageState(stage)
+        for stage in data['stages']:
+            self.stages[stage['id']] = StageState(stage)
     @classmethod
     def empty(cls):
-        return cls({'jobId': None, 'physicalStages': []})
+        return cls({'jobId': None, 'stages': []})
     def get(self, id):
         try:
             return self.stages[id]
         except:
-            return PhysicalStageState.empty()
+            return StageState.empty()
 
-class PhysicalStageState:
+class StageState:
     def __init__(self, data):
         self.id = data['id']
         self.state = data['state']
@@ -114,7 +114,7 @@ class DAG:
 
 def Vertex(id, properties, state):
     try:
-        return PhysicalStage(id, properties, state)
+        return Stage(id, properties, state)
     except:
         pass
     try:
@@ -249,7 +249,7 @@ class LoopVertex:
         vertexId = list(filter(lambda v: edgeId in self.incoming[v], self.incoming))[0]
         return self.dag.vertices[vertexId]
 
-class PhysicalStage:
+class Stage:
     def __init__(self, id, properties, state):
         self.id = id
         self.irVertex = DAG(properties['irDag'], JobState.empty())
@@ -276,7 +276,7 @@ class PhysicalStage:
 
 def Edge(src, dst, properties):
     try:
-        return PhysicalStageEdge(src, dst, properties)
+        return StageEdge(src, dst, properties)
     except:
         pass
     try:
@@ -325,7 +325,7 @@ class IREdge:
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
-class PhysicalStageEdge:
+class StageEdge:
     def __init__(self, src, dst, properties):
         self.src = src
         self.dst = dst
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 4ef453c..27b98ea 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -20,10 +20,10 @@ import edu.snu.nemo.compiler.backend.Backend;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.apache.reef.tang.Tang;
 
 /**
@@ -57,9 +57,9 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
    */
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalStageDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
     final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
-        physicalStageDAG, physicalPlanGenerator.getIdToIRVertex());
+        stageDAG, physicalPlanGenerator.getIdToIRVertex());
     return physicalPlan;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index 34c66fc..7fd0d82 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -21,7 +21,6 @@ import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -93,7 +92,11 @@ public class EmptyComponents {
 
     @Override
     public List<Readable<T>> getReadables(final int desirednumOfSplits) {
-      return Arrays.asList(new EmptyReadable<>());
+      final List list = new ArrayList(desirednumOfSplits);
+      for (int i = 0; i < desirednumOfSplits; i++) {
+        list.add(new EmptyReadable<>());
+      }
+      return list;
     }
 
     @Override
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index 44b0504..d4f4432 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@ -77,14 +77,6 @@ public final class MapReduceITCase {
   }
 
   @Test (timeout = TIMEOUT)
-  public void testDisagg() throws Exception {
-    JobLauncher.main(builder
-        .addJobId(MapReduceITCase.class.getSimpleName() + "_disagg")
-        .addOptimizationPolicy(DisaggregationPolicyParallelismFive.class.getCanonicalName())
-        .build());
-  }
-
-  @Test (timeout = TIMEOUT)
   public void testPado() throws Exception {
     JobLauncher.main(builder
         .addJobId(MapReduceITCase.class.getSimpleName() + "_pado")
diff --git a/examples/resources/beam_sample_executor_resources.json b/examples/resources/beam_sample_executor_resources.json
index 1f7f973..ced110c 100644
--- a/examples/resources/beam_sample_executor_resources.json
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -1,17 +1,12 @@
 [
   {
     "type": "Transient",
-    "memory_mb": 300,
-    "capacity": 3
+    "memory_mb": 512,
+    "capacity": 5
   },
   {
     "type": "Reserved",
-    "memory_mb": 300,
+    "memory_mb": 512,
     "capacity": 5
-  },
-  {
-    "type": "Compute",
-    "memory_mb": 300,
-    "capacity": 3
   }
 ]
diff --git a/examples/resources/spark_sample_executor_resources.json b/examples/resources/spark_sample_executor_resources.json
index 187bd45..ced110c 100644
--- a/examples/resources/spark_sample_executor_resources.json
+++ b/examples/resources/spark_sample_executor_resources.json
@@ -2,16 +2,11 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 2
+    "capacity": 5
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 2
-  },
-  {
-    "type": "Compute",
-    "memory_mb": 512,
-    "capacity": 2
+    "capacity": 5
   }
 ]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index b53ff34..e4fef9c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -40,7 +40,7 @@ public final class RuntimeIdGenerator {
   //////////////////////////////////////////////////////////////// Generate IDs
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
+   * Generates the ID for physical plan.
    *
    * @return the generated ID
    */
@@ -49,7 +49,7 @@ public final class RuntimeIdGenerator {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.stage.StageEdge}.
+   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.StageEdge}.
    *
    * @param irEdgeId .
    * @return the generated ID
@@ -59,17 +59,7 @@ public final class RuntimeIdGenerator {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.RuntimeEdge}.
-   *
-   * @param irEdgeId .
-   * @return the generated ID
-   */
-  public static String generateRuntimeEdgeId(final String irEdgeId) {
-    return "REdge-" + irEdgeId;
-  }
-
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.stage.Stage}.
+   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.Stage}.
    * @param stageId stage ID in numeric form.
    * @return the generated ID
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index e63f1d0..e9bd6eb 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -21,7 +21,7 @@ package edu.snu.nemo.runtime.common.eventhandler;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 /**
  * An event for triggering dynamic optimization.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index 6fa6bc9..63cd56e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.optimizer.RuntimeOptimizer;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.reef.wake.impl.PubSubEventHandler;
 
 import javax.inject.Inject;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
index ed576bd..fe91fd7 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.CompilerEvent;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 /**
  * An event for updating the physical plan in the scheduler.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
index 8e00280..ab03331 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.util.*;
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 7d3e7f4..ed2f92c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -24,9 +24,9 @@ import edu.snu.nemo.common.exception.DynamicOptimizationException;
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
 import org.slf4j.Logger;
@@ -60,16 +60,16 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
   @Override
   public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, List<Pair<Integer, Long>>> metricData) {
     // Builder to create new stages.
-    final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder =
+    final DAGBuilder<Stage, StageEdge> physicalDAGBuilder =
         new DAGBuilder<>(originalPlan.getStageDAG());
 
     // get edges to optimize
     final List<String> optimizationEdgeIds = metricData.keySet().stream().map(blockId ->
         RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
-    final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = originalPlan.getStageDAG();
-    final List<PhysicalStageEdge> optimizationEdges = stageDAG.getVertices().stream()
-        .flatMap(physicalStage -> stageDAG.getIncomingEdgesOf(physicalStage).stream())
-        .filter(physicalStageEdge -> optimizationEdgeIds.contains(physicalStageEdge.getId()))
+    final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
+    final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
+        .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
+        .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId()))
         .collect(Collectors.toList());
 
     // Get number of evaluators of the next stage (number of blocks).
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 57c929c..99401fb 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.io.Serializable;
 import java.util.Set;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
similarity index 89%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
index e7e37ef..1da8f97 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
@@ -29,8 +29,8 @@ public final class ExecutableTask implements Serializable {
   private final String jobId;
   private final String taskId;
   private final int taskIdx;
-  private final List<PhysicalStageEdge> taskIncomingEdges;
-  private final List<PhysicalStageEdge> taskOutgoingEdges;
+  private final List<StageEdge> taskIncomingEdges;
+  private final List<StageEdge> taskOutgoingEdges;
   private final int attemptIdx;
   private final String containerType;
   private final byte[] serializedTaskDag;
@@ -53,8 +53,8 @@ public final class ExecutableTask implements Serializable {
                         final int attemptIdx,
                         final String containerType,
                         final byte[] serializedIRDag,
-                        final List<PhysicalStageEdge> taskIncomingEdges,
-                        final List<PhysicalStageEdge> taskOutgoingEdges,
+                        final List<StageEdge> taskIncomingEdges,
+                        final List<StageEdge> taskOutgoingEdges,
                         final Map<String, Readable> irVertexIdToReadable) {
     this.jobId = jobId;
     this.taskId = taskId;
@@ -98,14 +98,14 @@ public final class ExecutableTask implements Serializable {
   /**
    * @return the incoming edges of the task.
    */
-  public List<PhysicalStageEdge> getTaskIncomingEdges() {
+  public List<StageEdge> getTaskIncomingEdges() {
     return taskIncomingEdges;
   }
 
   /**
    * @return the outgoing edges of the task.
    */
-  public List<PhysicalStageEdge> getTaskOutgoingEdges() {
+  public List<StageEdge> getTaskOutgoingEdges() {
     return taskOutgoingEdges;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
similarity index 87%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index b25a0ba..8809bd3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -26,7 +26,7 @@ import java.util.Map;
  */
 public final class PhysicalPlan implements Serializable {
   private final String id;
-  private final DAG<PhysicalStage, PhysicalStageEdge> stageDAG;
+  private final DAG<Stage, StageEdge> stageDAG;
   private final Map<String, IRVertex> idToIRVertex;
 
   /**
@@ -37,7 +37,7 @@ public final class PhysicalPlan implements Serializable {
    * @param idToIRVertex map from task to IR vertex.
    */
   public PhysicalPlan(final String id,
-                      final DAG<PhysicalStage, PhysicalStageEdge> stageDAG,
+                      final DAG<Stage, StageEdge> stageDAG,
                       final Map<String, IRVertex> idToIRVertex) {
     this.id = id;
     this.stageDAG = stageDAG;
@@ -54,7 +54,7 @@ public final class PhysicalPlan implements Serializable {
   /**
    * @return the DAG of stages.
    */
-  public DAG<PhysicalStage, PhysicalStageEdge> getStageDAG() {
+  public DAG<Stage, StageEdge> getStageDAG() {
     return stageDAG;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
similarity index 64%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 27991e3..adaf752 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
@@ -22,10 +22,7 @@ import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.stage.*;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
 import org.apache.reef.tang.annotations.Parameter;
@@ -37,10 +34,9 @@ import java.util.function.Function;
 /**
  * A function that converts an IR DAG to physical DAG.
  */
-public final class PhysicalPlanGenerator
-    implements Function<DAG<IRVertex, IREdge>, DAG<PhysicalStage, PhysicalStageEdge>> {
-
-  final String dagDirectory;
+public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
+  private final Map<String, IRVertex> idToIRVertex;
+  private final String dagDirectory;
 
   /**
    * Private constructor.
@@ -49,6 +45,7 @@ public final class PhysicalPlanGenerator
    */
   @Inject
   private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
+    this.idToIRVertex = new HashMap<>();
     this.dagDirectory = dagDirectory;
   }
 
@@ -59,7 +56,7 @@ public final class PhysicalPlanGenerator
    * @return {@link PhysicalPlan} to execute.
    */
   @Override
-  public DAG<PhysicalStage, PhysicalStageEdge> apply(final DAG<IRVertex, IREdge> irDAG) {
+  public DAG<Stage, StageEdge> apply(final DAG<IRVertex, IREdge> irDAG) {
     // first, stage-partition the IR DAG.
     final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG);
 
@@ -68,9 +65,12 @@ public final class PhysicalPlanGenerator
 
     // for debugging purposes.
     dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan");
-    // then create tasks and make it into a physical execution plan.
 
-    return stagesIntoPlan(dagOfStages);
+    return dagOfStages;
+  }
+
+  public Map<String, IRVertex> getIdToIRVertex() {
+    return idToIRVertex;
   }
 
   /**
@@ -124,24 +124,50 @@ public final class PhysicalPlanGenerator
     final Map<IRVertex, Stage> vertexStageMap = new HashMap<>();
 
     for (final List<IRVertex> stageVertices : vertexListForEachStage.values()) {
+      integrityCheck(stageVertices);
+
       final Set<IRVertex> currentStageVertices = new HashSet<>();
       final Set<StageEdgeBuilder> currentStageIncomingEdges = new HashSet<>();
 
       // Create a new stage builder.
       final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
           .orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
-      final StageBuilder stageBuilder =
-          new StageBuilder(irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
-              irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex));
+      final StageBuilder stageBuilder = new StageBuilder(
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism),
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex),
+          irVertexOfNewStage.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+
+      // Prepare useful variables.
+      final int stageParallelism = irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism);
+      final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
+      for (int i = 0; i < stageParallelism; i++) {
+        vertexIdToReadables.add(new HashMap<>());
+      }
 
       // For each vertex in the stage,
       for (final IRVertex irVertex : stageVertices) {
+        // Take care of the readables of a source vertex.
+        if (irVertex instanceof SourceVertex) {
+          final SourceVertex sourceVertex = (SourceVertex) irVertex;
+          try {
+            final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
+            for (int i = 0; i < stageParallelism; i++) {
+              vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
+            }
+          } catch (Exception e) {
+            throw new PhysicalPlanGenerationException(e);
+          }
+
+          // Clear internal metadata.
+          sourceVertex.clearInternalStates();
+        }
 
         // Add vertex to the stage.
         stageBuilder.addVertex(irVertex);
         currentStageVertices.add(irVertex);
 
-        // Connect all the incoming edges for the vertex
+        // Connect all the incoming edges for the vertex.
         final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(irVertex);
         final Optional<List<IREdge>> inEdgeList = (inEdges == null) ? Optional.empty() : Optional.of(inEdges);
         inEdgeList.ifPresent(edges -> edges.forEach(irEdge -> {
@@ -156,7 +182,12 @@ public final class PhysicalPlanGenerator
 
           // both vertices are in the stage.
           if (currentStageVertices.contains(srcVertex) && currentStageVertices.contains(dstVertex)) {
-            stageBuilder.connectInternalVertices(irEdge);
+            stageBuilder.connectInternalVertices(new RuntimeEdge<IRVertex>(
+                irEdge.getId(),
+                irEdge.getExecutionProperties(),
+                irEdge.getSrc(),
+                irEdge.getDst(),
+                irEdge.getCoder()));
           } else { // edge comes from another stage
             final Stage srcStage = vertexStageMap.get(srcVertex);
 
@@ -167,15 +198,19 @@ public final class PhysicalPlanGenerator
 
             final StageEdgeBuilder newEdgeBuilder = new StageEdgeBuilder(irEdge.getId())
                 .setEdgeProperties(irEdge.getExecutionProperties())
-                .setSrcVertex(srcVertex).setDstVertex(dstVertex)
+                .setSrcVertex(srcVertex)
+                .setDstVertex(dstVertex)
                 .setSrcStage(srcStage)
                 .setCoder(irEdge.getCoder())
                 .setSideInputFlag(irEdge.isSideInput());
             currentStageIncomingEdges.add(newEdgeBuilder);
           }
         }));
-      }
 
+        // Track id to irVertex.
+        idToIRVertex.put(irVertex.getId(), irVertex);
+      }
+      stageBuilder.addReadables(vertexIdToReadables);
       // If this runtime stage contains at least one vertex, build it!
       if (!stageBuilder.isEmpty()) {
         final Stage currentStage = stageBuilder.build();
@@ -197,110 +232,32 @@ public final class PhysicalPlanGenerator
     return dagOfStagesBuilder.build();
   }
 
-
-  private final Map<String, IRVertex> idToIRVertex = new HashMap<>();
-
-  /**
-   * @return the idToIRVertex map.
-   */
-  public Map<String, IRVertex> getIdToIRVertex() {
-    return idToIRVertex;
-  }
-
   /**
-   * Converts the given DAG of stages to a physical DAG for execution.
-   *
-   * @param dagOfStages IR DAG partitioned into stages.
-   * @return the converted physical DAG to execute,
-   * which consists of {@link PhysicalStage} and their relationship represented by {@link PhysicalStageEdge}.
+   * Integrity check for a stage's vertices.
+   * @param stageVertices to check for
    */
-  private DAG<PhysicalStage, PhysicalStageEdge> stagesIntoPlan(final DAG<Stage, StageEdge> dagOfStages) {
-    final Map<String, PhysicalStage> runtimeStageIdToPhysicalStageMap = new HashMap<>();
-    final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder = new DAGBuilder<>();
-
-    for (final Stage stage : dagOfStages.getVertices()) {
-      final List<IRVertex> stageVertices = stage.getStageInternalDAG().getVertices();
-
-      final ExecutionPropertyMap firstVertexProperties = stageVertices.iterator().next().getExecutionProperties();
-      final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
-      final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
-
-      // Only one DAG will be created and reused.
-      final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
-      // Collect split source readables in advance and bind to each irvertex to avoid extra source split.
-      final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
-      for (int i = 0; i < stageParallelism; i++) {
-        vertexIdToReadables.add(new HashMap<>());
+  private void integrityCheck(final List<IRVertex> stageVertices) {
+    final IRVertex firstVertex = stageVertices.get(0);
+    final String placement = firstVertex.getProperty(ExecutionProperty.Key.ExecutorPlacement);
+    final int scheduleGroup = firstVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
+    final int parallelism = firstVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism);
+
+    stageVertices.forEach(irVertex -> {
+      // Check vertex type.
+      if (!(irVertex instanceof  SourceVertex
+          || irVertex instanceof OperatorVertex
+          || irVertex instanceof MetricCollectionBarrierVertex)) {
+        throw new UnsupportedOperationException(irVertex.toString());
       }
 
-      // Iterate over the vertices contained in this stage
-      stageVertices.forEach(irVertex -> {
-        if (!(irVertex instanceof  SourceVertex
-            || irVertex instanceof OperatorVertex
-            || irVertex instanceof MetricCollectionBarrierVertex)) {
-          // Sanity check
-          throw new IllegalStateException(irVertex.toString());
-        }
-
-        if (irVertex instanceof SourceVertex) {
-          final SourceVertex sourceVertex = (SourceVertex) irVertex;
-
-          // Take care of the Readables
-          try {
-            final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
-            for (int i = 0; i < stageParallelism; i++) {
-              vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
-            }
-          } catch (Exception e) {
-            throw new PhysicalPlanGenerationException(e);
-          }
-
-          // Clear internal metadata
-          sourceVertex.clearInternalStates();
-        }
-
-        stageInternalDAGBuilder.addVertex(irVertex);
-        idToIRVertex.put(irVertex.getId(), irVertex);
-      });
-
-      // connect internal edges in the stage. It suffices to iterate over only the stage internal inEdges.
-      final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
-      stageInternalDAG.getVertices().forEach(irVertex -> {
-        final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
-        inEdges.forEach(edge ->
-            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
-                edge.getId(),
-                edge.getExecutionProperties(),
-                edge.getSrc(),
-                edge.getDst(),
-                edge.getCoder(),
-                edge.isSideInput())));
-      });
-
-      // Create the physical stage.
-      final PhysicalStage physicalStage =
-          new PhysicalStage(stage.getId(), stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
-              stageParallelism, stage.getScheduleGroupIndex(), containerType, vertexIdToReadables);
-
-      physicalDAGBuilder.addVertex(physicalStage);
-      runtimeStageIdToPhysicalStageMap.put(stage.getId(), physicalStage);
-    }
-
-    // Connect Physical stages
-    dagOfStages.getVertices().forEach(stage ->
-        dagOfStages.getIncomingEdgesOf(stage).forEach(stageEdge -> {
-          final PhysicalStage srcStage = runtimeStageIdToPhysicalStageMap.get(stageEdge.getSrc().getId());
-          final PhysicalStage dstStage = runtimeStageIdToPhysicalStageMap.get(stageEdge.getDst().getId());
-
-          physicalDAGBuilder.connectVertices(new PhysicalStageEdge(stageEdge.getId(),
-              stageEdge.getExecutionProperties(),
-              stageEdge.getSrcVertex(),
-              stageEdge.getDstVertex(),
-              srcStage, dstStage,
-              stageEdge.getCoder(),
-              stageEdge.isSideInput()));
-        }));
-
-    return physicalDAGBuilder.build();
+      // Check execution properties.
+      if ((placement != null
+          && !placement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
+          || scheduleGroup != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)
+          || parallelism != irVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism)) {
+        throw new RuntimeException("Vertices of the same stage have different execution properties: "
+            + irVertex.getId());
+      }
+    });
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
similarity index 86%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index a58cfba..138dbb9 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -13,14 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.commons.lang3.SerializationUtils;
 
 import java.util.ArrayList;
@@ -28,9 +27,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * PhysicalStage.
+ * Stage.
  */
-public final class PhysicalStage extends Vertex {
+public final class Stage extends Vertex {
   private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
   private final int parallelism;
   private final int scheduleGroupIndex;
@@ -48,12 +47,12 @@ public final class PhysicalStage extends Vertex {
    * @param containerType       the type of container to execute the task on.
    * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
    */
-  public PhysicalStage(final String stageId,
-                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
-                       final int parallelism,
-                       final int scheduleGroupIndex,
-                       final String containerType,
-                       final List<Map<String, Readable>> vertexIdToReadables) {
+  public Stage(final String stageId,
+               final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
+               final int parallelism,
+               final int scheduleGroupIndex,
+               final String containerType,
+               final List<Map<String, Readable>> vertexIdToReadables) {
     super(stageId);
     this.irDag = irDag;
     this.parallelism = parallelism;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
similarity index 52%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
index 3566948..9913fa9 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
@@ -13,93 +13,88 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.stage;
+package edu.snu.nemo.runtime.common.plan;
 
-import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.common.dag.DAGBuilder;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Stage Builder.
  */
-public final class StageBuilder {
-  private final DAGBuilder<IRVertex, IREdge> stageInternalDAGBuilder;
+final class StageBuilder {
+  private final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder;
   private final Integer stageId;
+  private final int parallelism;
   private final int scheduleGroupIndex;
+  private final String containerType;
+  private List<Map<String, Readable>> vertexIdToReadables;
 
   /**
    * Builds a {@link Stage}.
    * @param stageId stage ID in numeric form.
    * @param scheduleGroupIndex indicating its scheduling order.
    */
-  public StageBuilder(final Integer stageId,
-                      final int scheduleGroupIndex) {
+  StageBuilder(final Integer stageId,
+               final int parallelism,
+               final int scheduleGroupIndex,
+               final String containerType) {
     this.stageId = stageId;
+    this.parallelism = parallelism;
     this.scheduleGroupIndex = scheduleGroupIndex;
+    this.containerType = containerType;
     this.stageInternalDAGBuilder = new DAGBuilder<>();
+    this.vertexIdToReadables = new ArrayList<>(1);
   }
 
   /**
-   */
-  /**
    * Adds a {@link IRVertex} to this stage.
    * @param vertex to add.
    * @return the stageBuilder.
    */
-  public StageBuilder addVertex(final IRVertex vertex) {
+  StageBuilder addVertex(final IRVertex vertex) {
     stageInternalDAGBuilder.addVertex(vertex);
-    return this;
-  }
+    return this; }
 
   /**
    * Connects two {@link IRVertex} in this stage.
    * @param edge the IREdge that connects vertices.
    * @return the stageBuilder.
    */
-  public StageBuilder connectInternalVertices(final IREdge edge) {
+  StageBuilder connectInternalVertices(final RuntimeEdge<IRVertex> edge) {
     stageInternalDAGBuilder.connectVertices(edge);
     return this;
   }
 
-  /**
-   * @return true if this builder doesn't contain any valid {@link IRVertex}.
-   */
-  public boolean isEmpty() {
-    return stageInternalDAGBuilder.isEmpty();
+  StageBuilder addReadables(final List<Map<String, Readable>> vertexIdToReadable) {
+    this.vertexIdToReadables = vertexIdToReadable;
+    return this;
   }
 
   /**
-   * Integrity check for stages.
-   * @param stage stage to check for.
+   * @return true if this builder doesn't contain any valid {@link IRVertex}.
    */
-  private void integrityCheck(final Stage stage) {
-    final List<IRVertex> vertices = stage.getStageInternalDAG().getVertices();
-
-    final String firstPlacement = vertices.iterator().next().getProperty(ExecutionProperty.Key.ExecutorPlacement);
-    final int scheduleGroupIdx =
-        vertices.iterator().next().<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
-    vertices.forEach(irVertex -> {
-      if ((firstPlacement != null
-          && !firstPlacement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
-          || scheduleGroupIdx != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)) {
-        throw new RuntimeException("Vertices of the same stage have different execution properties: "
-            + irVertex.getId());
-      }
-    });
+  boolean isEmpty() {
+    return stageInternalDAGBuilder.isEmpty();
   }
 
   /**
    * Builds and returns the {@link Stage}.
    * @return the runtime stage.
    */
-  public Stage build() {
-    final Stage stage = new Stage(RuntimeIdGenerator.generateStageId(stageId),
-        stageInternalDAGBuilder.buildWithoutSourceSinkCheck(), scheduleGroupIndex);
-    integrityCheck(stage);
+  Stage build() {
+    final Stage stage = new Stage(
+        RuntimeIdGenerator.generateStageId(stageId),
+        stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
+        parallelism,
+        scheduleGroupIndex,
+        containerType,
+        vertexIdToReadables);
     return stage;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
similarity index 79%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 13c83e8..3b8cdf0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -13,22 +13,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.data.HashRange;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Contains information stage boundary {@link edu.snu.nemo.runtime.common.plan.stage.StageEdge}.
+ * Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
+ * This means that there can be multiple StageEdges between two Stages.
  */
-public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
+public final class StageEdge extends RuntimeEdge<Stage> {
   /**
    * The source {@link IRVertex}.
    * This belongs to the srcStage.
@@ -50,21 +50,21 @@ public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
    * Constructor.
    * @param runtimeEdgeId id of the runtime edge.
    * @param edgeProperties edge execution properties.
-   * @param srcVertex source vertex.
-   * @param dstVertex destination vertex.
+   * @param srcVertex source IRVertex in the srcStage of this edge.
+   * @param dstVertex destination IRVertex in the dstStage of this edge.
    * @param srcStage source stage.
    * @param dstStage destination stage.
    * @param coder the coder for enconding and deconding.
    * @param isSideInput whether or not the edge is a sideInput edge.
    */
-  public PhysicalStageEdge(final String runtimeEdgeId,
-                           final ExecutionPropertyMap edgeProperties,
-                           final IRVertex srcVertex,
-                           final IRVertex dstVertex,
-                           final PhysicalStage srcStage,
-                           final PhysicalStage dstStage,
-                           final Coder coder,
-                           final Boolean isSideInput) {
+  public StageEdge(final String runtimeEdgeId,
+                   final ExecutionPropertyMap edgeProperties,
+                   final IRVertex srcVertex,
+                   final IRVertex dstVertex,
+                   final Stage srcStage,
+                   final Stage dstStage,
+                   final Coder coder,
+                   final Boolean isSideInput) {
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, coder, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
similarity index 92%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
index cf5d3cc..b7448a1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.plan.stage;
+package edu.snu.nemo.runtime.common.plan;
 
 
 import edu.snu.nemo.common.coder.Coder;
@@ -37,7 +37,7 @@ public final class StageEdgeBuilder {
    * Represents the edge between vertices in a logical plan.
    * @param irEdgeId id of this edge.
    */
-  public StageEdgeBuilder(final String irEdgeId) {
+  StageEdgeBuilder(final String irEdgeId) {
     this.stageEdgeId = irEdgeId;
   }
 
@@ -112,7 +112,6 @@ public final class StageEdgeBuilder {
   }
 
   public StageEdge build() {
-    return new StageEdge(stageEdgeId,
-        edgeProperties, srcStage, dstStage, coder, isSideInput, srcVertex, dstVertex);
+    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, coder, isSideInput);
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java
deleted file mode 100644
index f56bdef..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.stage;
-
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.Vertex;
-
-/**
- * Represents a stage in Runtime's execution of a job.
- * Each stage contains a part of a whole execution plan.
- * Stage partitioning is determined by {edu.snu.nemo.compiler.backend.nemo.NemoBackend}.
- */
-public final class Stage extends Vertex {
-  private final DAG<IRVertex, IREdge> stageInternalDAG;
-  private final int scheduleGroupIndex;
-
-  /**
-   * Constructor.
-   * @param stageId id of the stage.
-   * @param stageInternalDAG the internal DAG of the stage.
-   * @param scheduleGroupIndex the schedule group index.
-   */
-  public Stage(final String stageId,
-               final DAG<IRVertex, IREdge> stageInternalDAG,
-               final int scheduleGroupIndex) {
-    super(stageId);
-    this.stageInternalDAG = stageInternalDAG;
-    this.scheduleGroupIndex = scheduleGroupIndex;
-  }
-
-  /**
-   * @return the internal DAG of the stage.
-   */
-  public DAG<IRVertex, IREdge> getStageInternalDAG() {
-    return stageInternalDAG;
-  }
-
-  /**
-   * @return the schedule group index.
-   */
-  public int getScheduleGroupIndex() {
-    return scheduleGroupIndex;
-  }
-
-  @Override
-  public String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
-    sb.append(", \"stageInternalDAG\": ").append(stageInternalDAG.toString());
-    sb.append("}");
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java
deleted file mode 100644
index 5d91959..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.stage;
-
-
-import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-
-/**
- * Stage Edge.
- */
-public final class StageEdge extends RuntimeEdge<Stage> {
-  private final IRVertex srcVertex;
-  private final IRVertex dstVertex;
-
-  /**
-   * Represents the edge between stages.
-   * @param irEdgeId id of this edge.
-   * @param edgeProperties to control the data flow on this edge.
-   * @param srcStage source runtime stage.
-   * @param dstStage destination runtime stage.
-   * @param coder coder.
-   * @param isSideInput flag for whether or not the edge is a sideInput.
-   * @param srcVertex source vertex (in srcStage).
-   * @param dstVertex destination vertex (in dstStage).
-   */
-  public StageEdge(final String irEdgeId,
-                   final ExecutionPropertyMap edgeProperties,
-                   final Stage srcStage,
-                   final Stage dstStage,
-                   final Coder coder,
-                   final Boolean isSideInput,
-                   final IRVertex srcVertex,
-                   final IRVertex dstVertex) {
-    super(RuntimeIdGenerator.generateStageEdgeId(irEdgeId), edgeProperties, srcStage, dstStage, coder, isSideInput);
-    this.srcVertex = srcVertex;
-    this.dstVertex = dstVertex;
-  }
-
-  /**
-   * @return the source vertex.
-   */
-  public IRVertex getSrcVertex() {
-    return srcVertex;
-  }
-
-  /**
-   * @return the destination vertex.
-   */
-  public IRVertex getDstVertex() {
-    return dstVertex;
-  }
-
-  @Override
-  public String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{\"runtimeEdgeId\": \"").append(getId());
-    sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
-    sb.append(", \"srcVertex\": \"").append(srcVertex.getId());
-    sb.append("\", \"dstVertex\": \"").append(dstVertex.getId());
-    sb.append("\", \"coder\": \"").append(getCoder().toString());
-    sb.append("\"}");
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
index 81286c1..41a2d0e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.common.state;
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
+ * Represents the states and their transitions of a physical plan.
  */
 public final class JobState {
   private final StateMachine stateMachine;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index 6e2a3d5..114275e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.common.state;
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalStage}.
+ * Represents the states and their transitions of a stage.
  */
 public final class StageState {
   private final StateMachine stateMachine;
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 78896df..a5b6830 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.commons.lang3.SerializationUtils;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 7a7b523..8872990 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -28,7 +28,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import org.apache.commons.lang3.SerializationUtils;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index 2349ab7..09a43cc 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -23,8 +23,9 @@ import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.*;
@@ -50,8 +51,8 @@ public final class TaskExecutor {
   private final String taskId;
   private final int taskIdx;
   private final TaskStateManager taskStateManager;
-  private final List<PhysicalStageEdge> stageIncomingEdges;
-  private final List<PhysicalStageEdge> stageOutgoingEdges;
+  private final List<StageEdge> stageIncomingEdges;
+  private final List<StageEdge> stageOutgoingEdges;
   private Map<String, Readable> irVertexIdToReadable;
 
   // Other parameters
@@ -140,8 +141,8 @@ public final class TaskExecutor {
     // 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
     // to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
     irVertexDag.topologicalDo(irVertex -> {
-      final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
-      final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
+      final Set<StageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
+      final Set<StageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
       final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
 
       // Set data handlers of children irVertices.
@@ -152,9 +153,9 @@ public final class TaskExecutor {
       dataHandler.setChildrenDataHandler(childrenDataHandlers);
 
       // Add InputReaders for inter-stage data transfer
-      inEdgesFromOtherStages.forEach(physicalStageEdge -> {
+      inEdgesFromOtherStages.forEach(stageEdge -> {
         final InputReader inputReader = channelFactory.createReader(
-            taskIdx, physicalStageEdge.getSrcVertex(), physicalStageEdge);
+            taskIdx, stageEdge.getSrcVertex(), stageEdge);
 
         // For InputReaders that have side input, collect them separately.
         if (inputReader.isSideInputReader()) {
@@ -166,9 +167,9 @@ public final class TaskExecutor {
       });
 
       // Add OutputWriters for inter-stage data transfer
-      outEdgesToOtherStages.forEach(physicalStageEdge -> {
+      outEdgesToOtherStages.forEach(stageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
-            irVertex, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+            irVertex, taskIdx, stageEdge.getDstVertex(), stageEdge);
         dataHandler.addOutputWriter(outputWriter);
       });
 
@@ -206,7 +207,7 @@ public final class TaskExecutor {
    * @param irVertex the IRVertex whose inter-stage incoming edges to be collected.
    * @return the collected incoming edges.
    */
-  private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
+  private Set<StageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
     return stageIncomingEdges.stream().filter(
         stageInEdge -> stageInEdge.getDstVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
@@ -218,7 +219,7 @@ public final class TaskExecutor {
    * @param irVertex the IRVertex whose inter-stage outgoing edges to be collected.
    * @return the collected outgoing edges.
    */
-  private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
+  private Set<StageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
     return stageOutgoingEdges.stream().filter(
         stageInEdge -> stageInEdge.getSrcVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
@@ -431,8 +432,8 @@ public final class TaskExecutor {
         final Object sideInput = getSideInput(sideInputIterator);
         final RuntimeEdge inEdge = sideInputReader.getRuntimeEdge();
         final Transform srcTransform;
-        if (inEdge instanceof PhysicalStageEdge) {
-          srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
+        if (inEdge instanceof StageEdge) {
+          srcTransform = ((OperatorVertex) ((StageEdge) inEdge).getSrcVertex()).getTransform();
         } else {
           srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
         }
@@ -473,8 +474,8 @@ public final class TaskExecutor {
       Object sideInput = input.remove();
       final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
       final Transform srcTransform;
-      if (inEdge instanceof PhysicalStageEdge) {
-        srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
+      if (inEdge instanceof StageEdge) {
+        srcTransform = ((OperatorVertex) ((StageEdge) inEdge).getSrcVertex()).getTransform();
       } else {
         srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
       }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index e2a31d6..7b1bc52 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 
 import java.util.*;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 3ca2402..a01e58c 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
 import edu.snu.nemo.runtime.common.data.HashRange;
@@ -113,9 +113,9 @@ public final class InputReader extends DataTransfer {
    * @return the list of the completable future of the data.
    */
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
-    assert (runtimeEdge instanceof PhysicalStageEdge);
+    assert (runtimeEdge instanceof StageEdge);
     final KeyRange hashRangeToRead =
-        ((PhysicalStageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+        ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 49401aa..b758fa8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -23,8 +23,10 @@ import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
 
@@ -136,10 +138,10 @@ public final class JobStateManager {
     onJobStateChanged(JobState.State.EXECUTING);
 
     // Initialize the states for the job down to task-level.
-    physicalPlan.getStageDAG().topologicalDo(physicalStage -> {
-      currentJobStageIds.add(physicalStage.getId());
-      idToStageStates.put(physicalStage.getId(), new StageState());
-      physicalStage.getTaskIds().forEach(taskId -> {
+    physicalPlan.getStageDAG().topologicalDo(stage -> {
+      currentJobStageIds.add(stage.getId());
+      idToStageStates.put(stage.getId(), new StageState());
+      stage.getTaskIds().forEach(taskId -> {
         idToTaskStates.put(taskId, new TaskState());
         taskIdToCurrentAttempt.put(taskId, 1);
       });
@@ -147,23 +149,23 @@ public final class JobStateManager {
   }
 
   private void initializePartitionStates(final BlockManagerMaster blockManagerMaster) {
-    final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = physicalPlan.getStageDAG();
-    stageDAG.topologicalDo(physicalStage -> {
-      final List<String> taskIdsForStage = physicalStage.getTaskIds();
-      final List<PhysicalStageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(physicalStage);
+    final DAG<Stage, StageEdge> stageDAG = physicalPlan.getStageDAG();
+    stageDAG.topologicalDo(stage -> {
+      final List<String> taskIdsForStage = stage.getTaskIds();
+      final List<StageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(stage);
 
       // Initialize states for blocks of inter-stage edges
-      stageOutgoingEdges.forEach(physicalStageEdge -> {
+      stageOutgoingEdges.forEach(stageEdge -> {
         final int srcParallelism = taskIdsForStage.size();
         IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
-          final String blockId = RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(), srcTaskIdx);
+          final String blockId = RuntimeIdGenerator.generateBlockId(stageEdge.getId(), srcTaskIdx);
           blockManagerMaster.initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
         });
       });
 
       // Initialize states for blocks of stage internal edges
       taskIdsForStage.forEach(taskId -> {
-        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = physicalStage.getIRDAG();
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = stage.getIRDAG();
         taskInternalDag.getVertices().forEach(task -> {
           final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
           internalOutgoingEdges.forEach(taskRuntimeEdge -> {
@@ -240,7 +242,7 @@ public final class JobStateManager {
       // if there exists a mapping, this state change is from a failed_recoverable stage,
       // and there may be tasks that do not need to be re-executed.
       if (!stageIdToRemainingTaskSet.containsKey(stageId)) {
-        for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
+        for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
           if (stage.getId().equals(stageId)) {
             Set<String> remainingTaskIds = new HashSet<>();
             remainingTaskIds.addAll(
@@ -510,9 +512,9 @@ public final class JobStateManager {
   public synchronized String toString() {
     final StringBuilder sb = new StringBuilder("{");
     sb.append("\"jobId\": \"").append(jobId).append("\", ");
-    sb.append("\"physicalStages\": [");
+    sb.append("\"stages\": [");
     boolean isFirstStage = true;
-    for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
       if (!isFirstStage) {
         sb.append(", ");
       }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index d2b709f..4751573 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 3035660..2ddb818 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -21,7 +21,7 @@ package edu.snu.nemo.runtime.master.eventhandler;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.CompilerEventHandler;
 import edu.snu.nemo.runtime.common.eventhandler.UpdatePhysicalPlanEvent;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index 8536746..f73ecdb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index f32a441..c51d1ec 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -22,12 +22,11 @@ import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
+import edu.snu.nemo.runtime.common.plan.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +111,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     LOG.info("Job to schedule: {}", jobToSchedule.getId());
 
     this.initialScheduleGroup = jobToSchedule.getStageDAG().getVertices().stream()
-        .mapToInt(physicalStage -> physicalStage.getScheduleGroupIndex())
+        .mapToInt(stage -> stage.getScheduleGroupIndex())
         .min().getAsInt();
 
     scheduleRootStages();
@@ -226,9 +225,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * Schedule stages in initial schedule group, in reverse-topological order.
    */
   private void scheduleRootStages() {
-    final List<PhysicalStage> rootStages =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
-            physicalStage.getScheduleGroupIndex() == initialScheduleGroup)
+    final List<Stage> rootStages =
+        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
+            stage.getScheduleGroupIndex() == initialScheduleGroup)
             .collect(Collectors.toList());
     Collections.reverse(rootStages);
     rootStages.forEach(this::scheduleStage);
@@ -239,8 +238,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @param completedStageId the ID of the stage that just completed and triggered this scheduling.
    */
   private void scheduleNextStage(final String completedStageId) {
-    final PhysicalStage completeOrFailedStage = getStageById(completedStageId);
-    final Optional<List<PhysicalStage>> nextStagesToSchedule =
+    final Stage completeOrFailedStage = getStageById(completedStageId);
+    final Optional<List<Stage>> nextStagesToSchedule =
         selectNextStagesToSchedule(completeOrFailedStage.getScheduleGroupIndex());
 
     if (nextStagesToSchedule.isPresent()) {
@@ -271,9 +270,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
    * enqueued to {@link PendingTaskCollection}.
    */
-  private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
+  private Optional<List<Stage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
     if (currentScheduleGroupIndex > initialScheduleGroup) {
-      final Optional<List<PhysicalStage>> ancestorStagesFromAScheduleGroup =
+      final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
           selectNextStagesToSchedule(currentScheduleGroupIndex - 1);
       if (ancestorStagesFromAScheduleGroup.isPresent()) {
         return ancestorStagesFromAScheduleGroup;
@@ -281,15 +280,15 @@ public final class BatchSingleJobScheduler implements Scheduler {
     }
 
     // All previous schedule groups are complete, we need to check for the current schedule group.
-    final List<PhysicalStage> currentScheduleGroup =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
-            physicalStage.getScheduleGroupIndex() == currentScheduleGroupIndex)
+    final List<Stage> currentScheduleGroup =
+        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
+            stage.getScheduleGroupIndex() == currentScheduleGroupIndex)
             .collect(Collectors.toList());
-    List<PhysicalStage> stagesToSchedule = new LinkedList<>();
+    List<Stage> stagesToSchedule = new LinkedList<>();
     boolean allStagesComplete = true;
 
     // We need to reschedule failed_recoverable stages.
-    for (final PhysicalStage stageToCheck : currentScheduleGroup) {
+    for (final Stage stageToCheck : currentScheduleGroup) {
       final StageState.State stageState =
           (StageState.State) jobStateManager.getStageState(stageToCheck.getId()).getStateMachine().getCurrentState();
       switch (stageState) {
@@ -313,9 +312,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
     // By the time the control flow has reached here,
     // we are ready to move onto the next ScheduleGroup
     stagesToSchedule =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage -> {
-          if (physicalStage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
-            final String stageId = physicalStage.getId();
+        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage -> {
+          if (stage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
+            final String stageId = stage.getId();
             return jobStateManager.getStageState(stageId).getStateMachine().getCurrentState()
                 != StageState.State.EXECUTING
                 && jobStateManager.getStageState(stageId).getStateMachine().getCurrentState()
@@ -341,10 +340,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * It adds the list of tasks for the stage where the scheduler thread continuously polls from.
    * @param stageToSchedule the stage to schedule.
    */
-  private void scheduleStage(final PhysicalStage stageToSchedule) {
-    final List<PhysicalStageEdge> stageIncomingEdges =
+  private void scheduleStage(final Stage stageToSchedule) {
+    final List<StageEdge> stageIncomingEdges =
         physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
-    final List<PhysicalStageEdge> stageOutgoingEdges =
+    final List<StageEdge> stageOutgoingEdges =
         physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
     final Enum stageState = jobStateManager.getStageState(stageToSchedule.getId()).getStateMachine().getCurrentState();
@@ -419,18 +418,18 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @return the IR dag
    */
   private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
-    for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
-      if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return physicalStage.getIRDAG();
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+      if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
+        return stage.getIRDAG();
       }
     }
     throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
   }
 
-  private PhysicalStage getStageById(final String stageId) {
-    for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
-      if (physicalStage.getId().equals(stageId)) {
-        return physicalStage;
+  private Stage getStageById(final String stageId) {
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+      if (stage.getId().equals(stageId)) {
+        return stage;
       }
     }
     throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
@@ -538,7 +537,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       case INPUT_READ_FAILURE:
         jobStateManager.onTaskStateChanged(taskId, newState);
         LOG.info("All tasks of {} will be made failed_recoverable.", stageId);
-        for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
+        for (final Stage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
           if (stage.getId().equals(stageId)) {
             LOG.info("Removing Tasks for {} before they are scheduled to an executor", stage.getId());
             pendingTaskCollection.removeTasksAndDescendants(stage.getId());
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index ab5081c..66fa986 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index f23a1f0..46b3c6b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index e72f486..5d19f52 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 361307f..1d8c8a2 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -15,8 +15,8 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index a232a79..afdc671 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index d34eb56..6e31269 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index f411c1d..aa53780 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 1e71882..7ea05fb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index 0375e01..fae4565 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -17,10 +17,10 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -147,7 +147,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
     }
 
     physicalPlan.getStageDAG().getChildren(stageId).forEach(
-        physicalStage -> removeStageAndChildren(physicalStage.getId()));
+        stage -> removeStageAndChildren(stage.getId()));
   }
 
   /**
@@ -161,7 +161,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    */
   private synchronized void updateSchedulableStages(
       final String candidateStageId, final String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
+    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
 
     if (isSchedulable(candidateStageId, candidateStageContainerType)) {
       // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
@@ -187,8 +187,8 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    * @return true if schedulable, false otherwise.
    */
   private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
-    for (final PhysicalStage descendantStage : jobDAG.getDescendants(candidateStageId)) {
+    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
+    for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
       if (schedulableStages.contains(descendantStage.getId())) {
         if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
           return false;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index f7056bf..bee0a36 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index af8371b..ee2cfa0 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -82,7 +82,7 @@ public final class BlockManagerMasterTest {
    */
   @Test
   public void testLostAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-0");
+    final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-0");
     final int srcTaskIndex = 0;
     final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-test");
     final String executorId = RuntimeIdGenerator.generateExecutorId();
@@ -115,7 +115,7 @@ public final class BlockManagerMasterTest {
    */
   @Test
   public void testBeforeAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-1");
+    final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-1");
     final int srcTaskIndex = 0;
     final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-Test");
     final String executorId = RuntimeIdGenerator.generateExecutorId();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index 2f5473b..5d515a7 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -22,7 +22,10 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
@@ -85,17 +88,17 @@ public final class JobStateManagerTest {
 
     assertEquals(jobStateManager.getJobId(), "TestPlan");
 
-    final List<PhysicalStage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
 
     for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) {
-      final PhysicalStage physicalStage = stageList.get(stageIdx);
-      jobStateManager.onStageStateChanged(physicalStage.getId(), StageState.State.EXECUTING);
-      final List<String> taskIds = physicalStage.getTaskIds();
+      final Stage stage = stageList.get(stageIdx);
+      jobStateManager.onStageStateChanged(stage.getId(), StageState.State.EXECUTING);
+      final List<String> taskIds = stage.getTaskIds();
       taskIds.forEach(taskId -> {
         jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
         jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
         if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 1) {
-          assertTrue(jobStateManager.checkStageCompletion(physicalStage.getId()));
+          assertTrue(jobStateManager.checkStageCompletion(stage.getId()));
         }
       });
       final Map<String, TaskState> taskStateMap = jobStateManager.getIdToTaskStates();
@@ -117,7 +120,7 @@ public final class JobStateManagerTest {
   public void testWaitUntilFinish() {
     // Create a JobStateManager of an empty dag.
     final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
         new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index 78466b2..308f3b5 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -20,7 +20,9 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
@@ -152,19 +154,19 @@ public final class BatchSingleJobSchedulerTest {
     // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
     for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
-      final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
+      final List<Stage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
-      stages.forEach(physicalStage -> {
-        while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
+      stages.forEach(stage -> {
+        while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
             != StageState.State.EXECUTING) {
 
         }
       });
 
-      stages.forEach(physicalStage -> {
+      stages.forEach(stage -> {
         SchedulerTestUtil.completeStage(
-            jobStateManager, scheduler, executorRegistry, physicalStage, SCHEDULE_ATTEMPT_INDEX);
+            jobStateManager, scheduler, executorRegistry, stage, SCHEDULE_ATTEMPT_INDEX);
       });
     }
 
@@ -174,13 +176,13 @@ public final class BatchSingleJobSchedulerTest {
     assertTrue(jobStateManager.checkJobTermination());
   }
 
-  private List<PhysicalStage> filterStagesWithAScheduleGroupIndex(
-      final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG, final int scheduleGroupIndex) {
-    final Set<PhysicalStage> stageSet = new HashSet<>(physicalDAG.filterVertices(
-        physicalStage -> physicalStage.getScheduleGroupIndex() == scheduleGroupIndex));
+  private List<Stage> filterStagesWithAScheduleGroupIndex(
+      final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) {
+    final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices(
+        stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex));
 
     // Return the filtered vertices as a sorted list
-    final List<PhysicalStage> sortedStages = new ArrayList<>(stageSet.size());
+    final List<Stage> sortedStages = new ArrayList<>(stageSet.size());
     physicalDAG.topologicalDo(stage -> {
       if (stageSet.contains(stage)) {
         sortedStages.add(stage);
@@ -189,7 +191,7 @@ public final class BatchSingleJobSchedulerTest {
     return sortedStages;
   }
 
-  private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG) {
+  private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) {
     final Set<Integer> scheduleGroupSet = new HashSet<>();
     physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex()));
     return scheduleGroupSet.size();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 3150475..1b7768f 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index 5b48315..f4ac23e 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -19,7 +19,8 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
@@ -132,9 +133,9 @@ public final class FaultToleranceTest {
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -203,9 +204,9 @@ public final class FaultToleranceTest {
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -269,9 +270,9 @@ public final class FaultToleranceTest {
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -324,13 +325,13 @@ public final class FaultToleranceTest {
     scheduler.scheduleJob(plan, jobStateManager);
 
     final List<ExecutorRepresenter> executors = new ArrayList<>();
-    final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
 
     int executorIdIndex = 1;
     float removalChance = 0.7f; // Out of 1.0
     final Random random = new Random(0); // Deterministic seed.
 
-    for (final PhysicalStage stage : dagOf4Stages) {
+    for (final Stage stage : dagOf4Stages) {
 
       while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != COMPLETE) {
         // By chance, remove or add executor
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
index 0071b9f..ac77c46 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 5edd885..d40859b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index a9d1d4b..644fb38 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -32,21 +32,21 @@ final class SchedulerTestUtil {
    * @param jobStateManager for the submitted job.
    * @param scheduler for the submitted job.
    * @param executorRegistry provides executor representers
-   * @param physicalStage for which the states should be marked as complete.
+   * @param stage for which the states should be marked as complete.
    */
   static void completeStage(final JobStateManager jobStateManager,
                             final Scheduler scheduler,
                             final ExecutorRegistry executorRegistry,
-                            final PhysicalStage physicalStage,
+                            final Stage stage,
                             final int attemptIdx) {
     // Loop until the stage completes.
     while (true) {
-      final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
+      final Enum stageState = jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState();
       if (StageState.State.COMPLETE == stageState) {
         // Stage has completed, so we break out of the loop.
         break;
       } else if (StageState.State.EXECUTING == stageState) {
-        physicalStage.getTaskIds().forEach(taskId -> {
+        stage.getTaskIds().forEach(taskId -> {
           final Enum tgState = jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState();
           if (TaskState.State.EXECUTING == tgState) {
             sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 0ae5998..921dfe8 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -16,7 +16,9 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +61,7 @@ public final class SingleTaskQueueTest {
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, true);
 
     pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -115,7 +117,7 @@ public final class SingleTaskQueueTest {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, false);
     pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
@@ -168,7 +170,7 @@ public final class SingleTaskQueueTest {
     final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
         TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, true);
     pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -206,10 +208,10 @@ public final class SingleTaskQueueTest {
   }
 
   /**
-   * Schedule the tasks in a physical stage.
+   * Schedule the tasks in a stage.
    * @param stage the stage to schedule.
    */
-  private void scheduleStage(final PhysicalStage stage) {
+  private void scheduleStage(final Stage stage) {
     stage.getTaskIds().forEach(taskId ->
         pendingTaskPriorityQueue.add(new ExecutableTask(
             "TestPlan",
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 65f9d98..4263e7f 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index f9d20bd..9d4142d 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -31,10 +31,10 @@ import edu.snu.nemo.compiler.optimizer.policy.BasicPullPolicy;
 import edu.snu.nemo.compiler.optimizer.policy.BasicPushPolicy;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
@@ -100,7 +100,7 @@ public final class TestPlanGenerator {
   private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDAG,
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
+    final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
     return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index f257926..fe69942 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -26,10 +26,10 @@ import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -74,7 +74,7 @@ public class ClientEndpointTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
     final PhysicalPlanGenerator physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
 
     final LocalMessageDispatcher messageDispatcher = new LocalMessageDispatcher();
     final LocalMessageEnvironment messageEnvironment =
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index 68f9b12..ebbc7b6 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -27,8 +27,8 @@ import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.compiler.optimizer.policy.PadoPolicy;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index 611d821..f948589 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -32,11 +32,9 @@ import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.stage.Stage;
-import edu.snu.nemo.runtime.common.plan.stage.StageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -80,7 +78,7 @@ public final class DAGConverterTest {
     final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
         new TestPolicy(), "");
     final DAG<Stage, StageEdge> DAGOfStages = physicalPlanGenerator.stagePartitionIrDAG(irDAG);
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
 
     // Test DAG of stages
     final List<Stage> sortedDAGOfStages = DAGOfStages.getTopologicalSort();
@@ -94,9 +92,9 @@ public final class DAGConverterTest {
     assertEquals(DAGOfStages.getOutgoingEdgesOf(stage2).size(), 0);
 
     // Test Physical DAG
-    final List<PhysicalStage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
-    final PhysicalStage physicalStage1 = sortedPhysicalDAG.get(0);
-    final PhysicalStage physicalStage2 = sortedPhysicalDAG.get(1);
+    final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
+    final Stage physicalStage1 = sortedPhysicalDAG.get(0);
+    final Stage physicalStage2 = sortedPhysicalDAG.get(1);
     assertEquals(physicalDAG.getVertices().size(), 2);
     assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage1).size(), 0);
     assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage2).size(), 1);
@@ -238,10 +236,10 @@ public final class DAGConverterTest {
 
     // Test Physical DAG
 
-//    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = logicalDAG.convert(new PhysicalDAGGenerator());
-//    final List<PhysicalStage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
-//    final PhysicalStage physicalStage1 = sortedPhysicalDAG.get(0);
-//    final PhysicalStage physicalStage2 = sortedPhysicalDAG.get(1);
+//    final DAG<Stage, StageEdge> physicalDAG = logicalDAG.convert(new PhysicalDAGGenerator());
+//    final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
+//    final Stage physicalStage1 = sortedPhysicalDAG.get(0);
+//    final Stage physicalStage2 = sortedPhysicalDAG.get(1);
 //    assertEquals(physicalDAG.getVertices().size(), 2);
 //    assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage1).size(), 0);
 //    assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage2).size(), 1);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index a6df1d4..a946b4b 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -27,8 +27,9 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
 import edu.snu.nemo.runtime.executor.TaskExecutor;
 import edu.snu.nemo.runtime.executor.TaskStateManager;
@@ -59,7 +60,7 @@ import static org.mockito.Mockito.*;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
-    TaskStateManager.class, PhysicalStageEdge.class})
+    TaskStateManager.class, StageEdge.class})
 public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
   private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
@@ -112,7 +113,7 @@ public final class TaskExecutorTest {
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
         new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().addVertex(sourceIRVertex).buildWithoutSourceSinkCheck();
-    final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
+    final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
     final ExecutableTask executableTask =
@@ -165,9 +166,9 @@ public final class TaskExecutorTest {
             runtimeIREdgeId, edgeProperties, operatorIRVertex1, operatorIRVertex2, coder))
         .buildWithoutSourceSinkCheck();
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
-    final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
+    final StageEdge stageInEdge = mock(StageEdge.class);
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
-    final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
+    final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
     final ExecutableTask executableTask =
         new ExecutableTask(
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index cfdeb2f..98b8cc0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -124,7 +124,7 @@ public final class BlockStoreTest {
     IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
 
     // Generates the ids and the data of the blocks to be used.
-    final String shuffleEdge = RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
+    final String shuffleEdge = RuntimeIdGenerator.generateStageEdgeId("shuffle_edge");
     IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
       final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, writeTaskIdx);
@@ -146,7 +146,7 @@ public final class BlockStoreTest {
     // Following part is for the concurrent read test.
     final String writeTaskId = "conc_write_IR_vertex";
     final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
-    final String concEdge = RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
+    final String concEdge = RuntimeIdGenerator.generateStageEdgeId("conc_read_edge");
 
     // Generates the ids and the data to be used.
     concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
@@ -168,7 +168,7 @@ public final class BlockStoreTest {
     // Generates the ids of the tasks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
     IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
-    final String hashEdge = RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
+    final String hashEdge = RuntimeIdGenerator.generateStageEdgeId("hash_edge");
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 61ef9b0..697122e 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -37,8 +37,8 @@ import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -320,9 +320,9 @@ public final class DataTransferTest {
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
-    final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
-    final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+    final Stage srcStage = setupStages("srcStage-" + testIndex);
+    final Stage dstStage = setupStages("dstStage-" + testIndex);
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
         srcStage, dstStage, CODER, false);
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -407,13 +407,13 @@ public final class DataTransferTest {
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
-    final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
-    final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+    final Stage srcStage = setupStages("srcStage-" + testIndex);
+    final Stage dstStage = setupStages("dstStage-" + testIndex);
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
         srcStage, dstStage, CODER, false);
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
-    final PhysicalStage dstStage2 = setupStages("dstStage-" + testIndex2);
-    dummyEdge2 = new PhysicalStageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
+    final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
+    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
         srcStage, dstStage2, CODER, false);
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -541,9 +541,9 @@ public final class DataTransferTest {
     return Pair.of(srcVertex, dstVertex);
   }
 
-  private PhysicalStage setupStages(final String stageId) {
+  private Stage setupStages(final String stageId) {
     final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
 
-    return new PhysicalStage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
+    return new Stage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
   }
 }

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.