You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by je...@apache.org on 2018/06/01 05:18:02 UTC

[incubator-nemo] branch master updated: [NEMO-79] Clean up the legacy Task (#24)

This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9941c1c  [NEMO-79] Clean up the legacy Task (#24)
9941c1c is described below

commit 9941c1c662603c94769224ee46bcb0830be6ac0f
Author: John Yang <jo...@gmail.com>
AuthorDate: Fri Jun 1 14:17:51 2018 +0900

    [NEMO-79] Clean up the legacy Task (#24)
    
    JIRA: NEMO-79: Clean up the legacy Task
    
    Major changes:
    * Removes the 'old' Task interface and its implementations to make the 'new' Task (formerly TaskGroup) become the only Task in the codebase
    * Specifically, changes the DAG in the 'new' Task from DAG<Task, RuntimeEdge> to DAG<IRVertex, RuntimeEdge>
    * Adds SourceVertex#clearInternalStates to clear the huge list of input splits held by BeamBoundedSourceVertex before sending the vertex to remote executors. Between clearing states of an existing vertex and creating a new vertex, I've chosen the former approach to ensure consistent use of the same IRVertex object across the compiler, the master, and the executors.
    
    Minor changes to note:
    * Make BeamBoundedSourceVertex remember sourceDescription for visualization purposes
    * Changes related method and variable names appropriately
    * Changes comments appropriately
    
    Tests for the changes:
    N/A (no new feature was added)
    
    Other comments:
    N/A
    
    resolves NEMO-79
---
 bin/json2dot.py                                    |  10 +-
 .../edu/snu/nemo/common/ir/vertex/IRVertex.java    |   3 +-
 ...SourceVertex.java => InMemorySourceVertex.java} |  29 +-
 .../snu/nemo/common/ir/vertex/SourceVertex.java    |   9 +
 .../nemo/compiler/backend/nemo/NemoBackend.java    |   2 +-
 .../beam/source/BeamBoundedSourceVertex.java       |  11 +-
 .../compiler/frontend/spark/core/java/JavaRDD.java |   2 +-
 .../source/SparkDatasetBoundedSourceVertex.java    |   7 +-
 .../source/SparkTextFileBoundedSourceVertex.java   |   7 +-
 .../optimizer/examples/EmptyComponents.java        |   6 +-
 .../nemo/runtime/common/RuntimeIdGenerator.java    |  49 +--
 .../pass/runtime/DataSkewRuntimePass.java          |   2 +-
 .../common/plan/physical/BoundedSourceTask.java    |  51 ---
 .../{ScheduledTask.java => ExecutableTask.java}    |  59 ++--
 .../plan/physical/MetricCollectionBarrierTask.java |  31 --
 .../runtime/common/plan/physical/OperatorTask.java |  45 ---
 .../runtime/common/plan/physical/PhysicalPlan.java |  25 +-
 .../plan/physical/PhysicalPlanGenerator.java       |  85 +++--
 .../common/plan/physical/PhysicalStage.java        |  47 +--
 .../nemo/runtime/common/plan/physical/Task.java    |  52 ---
 .../common/plan/physical/UnboundedSourceTask.java  |  31 --
 .../snu/nemo/runtime/common/state/TaskState.java   |   3 +-
 runtime/common/src/main/proto/ControlMessage.proto |   2 +-
 .../edu/snu/nemo/runtime/executor/Executor.java    |  35 +-
 .../snu/nemo/runtime/executor/TaskExecutor.java    | 362 ++++++++++-----------
 .../nemo/runtime/executor/TaskStateManager.java    |  86 +++--
 .../runtime/executor/data/BlockManagerWorker.java  |   6 +-
 .../executor/datatransfer/DataTransferFactory.java |   7 +-
 ...skDataHandler.java => IRVertexDataHandler.java} |  52 +--
 .../snu/nemo/runtime/master/JobStateManager.java   |   5 +-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |   4 +-
 .../master/resource/ExecutorRepresenter.java       |  14 +-
 .../master/scheduler/BatchSingleJobScheduler.java  |  46 +--
 .../scheduler/CompositeSchedulingPolicy.java       |   6 +-
 .../ContainerTypeAwareSchedulingPolicy.java        |  10 +-
 .../master/scheduler/FreeSlotSchedulingPolicy.java |   6 +-
 .../master/scheduler/PendingTaskCollection.java    |  10 +-
 .../scheduler/RoundRobinSchedulingPolicy.java      |   6 +-
 .../nemo/runtime/master/scheduler/Scheduler.java   |   3 -
 .../runtime/master/scheduler/SchedulerRunner.java  |   6 +-
 .../runtime/master/scheduler/SchedulingPolicy.java |   4 +-
 .../master/scheduler/SingleJobTaskCollection.java  |  26 +-
 .../SourceLocationAwareSchedulingPolicy.java       |   8 +-
 .../ContainerTypeAwareSchedulingPolicyTest.java    |  18 +-
 .../scheduler/FreeSlotSchedulingPolicyTest.java    |  10 +-
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |   8 +-
 .../scheduler/SingleTaskQueueTest.java             |  26 +-
 .../SourceLocationAwareSchedulingPolicyTest.java   |  36 +-
 .../runtime/plangenerator/TestPlanGenerator.java   |   2 +-
 .../snu/nemo/tests/client/ClientEndpointTest.java  |   2 +-
 .../tests/runtime/executor/TaskExecutorTest.java   | 108 +++---
 .../runtime/executor/data/BlockStoreTest.java      |  60 ++--
 .../executor/datatransfer/DataTransferTest.java    |   4 +-
 .../tests/runtime/master/JobStateManagerTest.java  |   4 +-
 54 files changed, 642 insertions(+), 906 deletions(-)

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 12f0d48..e4c77bf 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -64,8 +64,8 @@ class PhysicalStageState:
         self.id = data['id']
         self.state = data['state']
         self.tasks = {}
-        for task in data['tasks']:
-            self.tasks[task['id']] = TaskState(task)
+        for irVertex in data['tasks']:
+            self.tasks[irVertex['id']] = TaskState(irVertex)
     @classmethod
     def empty(cls):
         return cls({'id': None, 'state': None, 'tasks': []})
@@ -252,7 +252,7 @@ class LoopVertex:
 class PhysicalStage:
     def __init__(self, id, properties, state):
         self.id = id
-        self.task = DAG(properties['taskDag'], JobState.empty())
+        self.irVertex = DAG(properties['irDag'], JobState.empty())
         self.idx = getIdx()
         self.state = state
     @property
@@ -264,12 +264,12 @@ class PhysicalStage:
         dot = 'subgraph cluster_{} {{'.format(self.idx)
         dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
         dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
-        dot += self.task.dot
+        dot += self.irVertex.dot
         dot += '}'
         return dot
     @property
     def oneVertex(self):
-        return next(iter(self.task.vertices.values())).oneVertex
+        return next(iter(self.irVertex.vertices.values())).oneVertex
     @property
     def logicalEnd(self):
         return 'cluster_{}'.format(self.idx)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index ec2e469..df783b4 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -21,7 +21,8 @@ import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 /**
- * The top-most wrapper for a user operation in the IR.
+ * The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo.
+ * An IRVertex is created and modified in the compiler, and executed in the runtime.
  */
 public abstract class IRVertex extends Vertex {
   private final ExecutionPropertyMap executionProperties;
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
similarity index 73%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
index b64d61b..43fbadc 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -22,23 +22,23 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * Source vertex with initial data as object.
- * @param <T> type of initial data.
+ * Source vertex with the data in memory.
+ * @param <T> type of data.
  */
-public final class InitializedSourceVertex<T> extends SourceVertex<T> {
-  private final Iterable<T> initializedSourceData;
+public final class InMemorySourceVertex<T> extends SourceVertex<T> {
+  private Iterable<T> initializedSourceData;
 
   /**
    * Constructor.
    * @param initializedSourceData the initial data object.
    */
-  public InitializedSourceVertex(final Iterable<T> initializedSourceData) {
+  public InMemorySourceVertex(final Iterable<T> initializedSourceData) {
     this.initializedSourceData = initializedSourceData;
   }
 
   @Override
-  public InitializedSourceVertex<T> getClone() {
-    final InitializedSourceVertex<T> that = new InitializedSourceVertex<>(this.initializedSourceData);
+  public InMemorySourceVertex<T> getClone() {
+    final InMemorySourceVertex<T> that = new InMemorySourceVertex<>(this.initializedSourceData);
     this.copyExecutionPropertiesTo(that);
     return that;
   }
@@ -61,23 +61,28 @@ public final class InitializedSourceVertex<T> extends SourceVertex<T> {
         }
       }
 
-      readables.add(new InitializedSourceReadable<>(dataForReader));
+      readables.add(new InMemorySourceReadable<>(dataForReader));
     }
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    initializedSourceData = null;
+  }
+
   /**
-   * Readable for initialized source vertex. It simply returns the initialized data.
-   * @param <T> type of the initial data.
+   * Simply returns the in-memory data.
+   * @param <T> type of the data.
    */
-  private static final class InitializedSourceReadable<T> implements Readable<T> {
+  private static final class InMemorySourceReadable<T> implements Readable<T> {
     private final Iterable<T> initializedSourceData;
 
     /**
      * Constructor.
      * @param initializedSourceData the source data.
      */
-    private InitializedSourceReadable(final Iterable<T> initializedSourceData) {
+    private InMemorySourceReadable(final Iterable<T> initializedSourceData) {
       this.initializedSourceData = initializedSourceData;
     }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
index 1597462..d319b72 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
@@ -34,4 +34,13 @@ public abstract class SourceVertex<O> extends IRVertex {
    * @throws Exception if fail to get.
    */
   public abstract List<Readable<O>> getReadables(int desiredNumOfSplits) throws Exception;
+
+  /**
+   * Clears internal states, must be called after getReadables().
+   * Concretely, this clears the huge list of input splits held by objects like BeamBoundedSourceVertex before
+   * sending the vertex to remote executors.
+   * Between clearing states of an existing vertex, and creating a new vertex, we've chosen the former approach
+   * to ensure consistent use of the same IRVertex object across the compiler, the master, and the executors.
+   */
+  public abstract void clearInternalStates();
 }
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 027378c..4ef453c 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -59,7 +59,7 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<PhysicalStage, PhysicalStageEdge> physicalStageDAG = irDAG.convert(physicalPlanGenerator);
     final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
-        physicalStageDAG, physicalPlanGenerator.getTaskIRVertexMap());
+        physicalStageDAG, physicalPlanGenerator.getIdToIRVertex());
     return physicalPlan;
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index f03bc57..1143a1a 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory;
  */
 public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
-  private final BoundedSource<O> source;
+  private BoundedSource<O> source;
+  private final String sourceDescription;
 
   /**
    * Constructor of BeamBoundedSourceVertex.
@@ -43,6 +44,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
    */
   public BeamBoundedSourceVertex(final BoundedSource<O> source) {
     this.source = source;
+    this.sourceDescription = source.toString();
   }
 
   @Override
@@ -63,12 +65,17 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
   }
 
   @Override
+  public void clearInternalStates() {
+    source = null;
+  }
+
+  @Override
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{");
     sb.append(irVertexPropertiesToString());
     sb.append(", \"source\": \"");
-    sb.append(source);
+    sb.append(sourceDescription);
     sb.append("\"}");
     return sb.toString();
   }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
index a8d024c..a3e3fd6 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
@@ -71,7 +71,7 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
                                   final int parallelism) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
-    final IRVertex initializedSourceVertex = new InitializedSourceVertex<>(initialData);
+    final IRVertex initializedSourceVertex = new InMemorySourceVertex<>(initialData);
     initializedSourceVertex.setProperty(ParallelismProperty.of(parallelism));
     builder.addVertex(initializedSourceVertex);
 
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 775faf4..6e83b7f 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -30,7 +30,7 @@ import java.util.*;
  * @param <T> type of data to read.
  */
 public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
-  private final List<Readable<T>> readables;
+  private List<Readable<T>> readables;
 
   /**
    * Constructor.
@@ -72,6 +72,11 @@ public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    readables = null;
+  }
+
   /**
    * A Readable wrapper for Spark Dataset.
    */
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index a5e7833..cac2674 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -27,7 +27,7 @@ import java.util.*;
  * Bounded source vertex for Spark text file.
  */
 public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String> {
-  private final List<Readable<String>> readables;
+  private List<Readable<String>> readables;
 
   /**
    * Constructor.
@@ -72,6 +72,11 @@ public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String>
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    readables = null;
+  }
+
   /**
    * A Readable wrapper for Spark text file.
    */
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index 257b757..34c66fc 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -72,7 +72,7 @@ public class EmptyComponents {
    * @param <T> type of the data.
    */
   public static final class EmptySourceVertex<T> extends SourceVertex<T> {
-    private final String name;
+    private String name;
 
     /**
      * Constructor.
@@ -97,6 +97,10 @@ public class EmptyComponents {
     }
 
     @Override
+    public void clearInternalStates() {
+    }
+
+    @Override
     public EmptySourceVertex<T> getClone() {
       return new EmptySourceVertex<>(this.name);
     }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index dd19b27..b53ff34 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -29,7 +29,6 @@ public final class RuntimeIdGenerator {
   private static final String BLOCK_PREFIX = "Block-";
   private static final String BLOCK_ID_SPLITTER = "_";
   private static final String TASK_INFIX = "-Task-";
-  private static final String PHYSICAL_TASK_ID_SPLITTER = "_";
 
   /**
    * Private constructor which will not be used.
@@ -37,6 +36,9 @@ public final class RuntimeIdGenerator {
   private RuntimeIdGenerator() {
   }
 
+
+  //////////////////////////////////////////////////////////////// Generate IDs
+
   /**
    * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
    *
@@ -76,36 +78,13 @@ public final class RuntimeIdGenerator {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
-   *
-   * @param irVertexId the ID of the IR vertex.
-   * @return the generated ID
-   */
-  public static String generateLogicalTaskId(final String irVertexId) {
-    return "Task-" + irVertexId;
-  }
-
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
-   *
-   * @param index         the index of the physical task.
-   * @param logicalTaskId the logical ID of the task.
-   * @return the generated ID
-   */
-  public static String generatePhysicalTaskId(final int index,
-                                              final String logicalTaskId) {
-    return logicalTaskId + PHYSICAL_TASK_ID_SPLITTER + index;
-  }
-
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
+   * Generates the ID for a task.
    *
    * @param index   the index of this task.
    * @param stageId the ID of the stage.
    * @return the generated ID
    */
-  public static String generateTaskId(final int index,
-                                           final String stageId) {
+  public static String generateTaskId(final int index, final String stageId) {
     return stageId + TASK_INFIX + index;
   }
 
@@ -122,12 +101,12 @@ public final class RuntimeIdGenerator {
    * Generates the ID for a block, whose data is the output of a task.
    *
    * @param runtimeEdgeId of the block
-   * @param taskIndex     of the block
+   * @param producerTaskIndex of the block
    * @return the generated ID
    */
   public static String generateBlockId(final String runtimeEdgeId,
-                                       final int taskIndex) {
-    return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + taskIndex;
+                                       final int producerTaskIndex) {
+    return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + producerTaskIndex;
   }
 
   /**
@@ -148,6 +127,8 @@ public final class RuntimeIdGenerator {
     return "ResourceSpec-" + resourceSpecIdGenerator.getAndIncrement();
   }
 
+  //////////////////////////////////////////////////////////////// Parse IDs
+
   /**
    * Extracts runtime edge ID from a block ID.
    *
@@ -210,14 +191,4 @@ public final class RuntimeIdGenerator {
   private static String[] parseTaskId(final String taskId) {
     return taskId.split(TASK_INFIX);
   }
-
-  /**
-   * Extracts logical task ID from a physical task ID.
-   *
-   * @param physicalTaskId the physical task ID to extract.
-   * @return the logical task ID.
-   */
-  public static String getLogicalTaskIdIdFromPhysicalTaskId(final String physicalTaskId) {
-    return physicalTaskId.split(PHYSICAL_TASK_ID_SPLITTER)[0];
-  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index b9d263b..7d3e7f4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -87,7 +87,7 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
       optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
     });
 
-    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getTaskIRVertexMap());
+    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getIdToIRVertex());
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
deleted file mode 100644
index 94e1332..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.Readable;
-
-/**
- * BoundedSourceTask.
- * @param <O> the output type.
- */
-public final class BoundedSourceTask<O> extends Task {
-  private Readable<O> readable;
-
-  /**
-   * Constructor.
-   * @param taskId id of the task.
-   * @param irVertexId id of the IR vertex.
-   */
-  public BoundedSourceTask(final String taskId,
-                           final String irVertexId) {
-    super(taskId, irVertexId);
-  }
-
-  /**
-   * Sets the readable for this task.
-   * @param readableToSet the readable to set.
-   */
-  public void setReadable(final Readable<O> readableToSet) {
-    this.readable = readableToSet;
-  }
-
-  /**
-   * @return the readable of source data.
-   */
-  public Readable<O> getReadable() {
-    return readable;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
similarity index 56%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
index d5f19a7..e7e37ef 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
@@ -23,16 +23,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A ScheduledTask is a grouping of {@link Task}s that belong to a stage.
- * Executors receive units of ScheduledTasks during job execution,
- * and thus the resource type of all tasks of a ScheduledTask must be identical.
- * A stage contains a list of IDs of Tasks whose length corresponds to stage/operator parallelism.
- *
- * This class includes all information which will be required from the executor after scheduled,
- * including the (serialized) DAG of {@link Task}s,
- * the incoming/outgoing edges to/from the stage the Task belongs to, and so on.
+ * An ExecutableTask is a self-contained executable that can be executed in specific types of containers.
  */
-public final class ScheduledTask implements Serializable {
+public final class ExecutableTask implements Serializable {
   private final String jobId;
   private final String taskId;
   private final int taskIdx;
@@ -41,37 +34,37 @@ public final class ScheduledTask implements Serializable {
   private final int attemptIdx;
   private final String containerType;
   private final byte[] serializedTaskDag;
-  private final Map<String, Readable> logicalTaskIdToReadable;
+  private final Map<String, Readable> irVertexIdToReadable;
 
   /**
    * Constructor.
    *
-   * @param jobId                   the id of the job.
-   * @param serializedTaskDag  the serialized DAG of the task.
-   * @param taskId             the ID of the scheduled task.
-   * @param taskIncomingEdges  the incoming edges of the task.
-   * @param taskOutgoingEdges  the outgoing edges of the task.
-   * @param attemptIdx              the attempt index.
-   * @param containerType           the type of container to execute the task on.
-   * @param logicalTaskIdToReadable the map between logical task ID and readable.
+   * @param jobId                the id of the job.
+   * @param taskId               the ID of the scheduled task.
+   * @param attemptIdx           the attempt index.
+   * @param containerType        the type of container to execute the task on.
+   * @param serializedIRDag      the serialized DAG of the task.
+   * @param taskIncomingEdges    the incoming edges of the task.
+   * @param taskOutgoingEdges    the outgoing edges of the task.
+   * @param irVertexIdToReadable the map between logical task ID and readable.
    */
-  public ScheduledTask(final String jobId,
-                            final byte[] serializedTaskDag,
-                            final String taskId,
-                            final List<PhysicalStageEdge> taskIncomingEdges,
-                            final List<PhysicalStageEdge> taskOutgoingEdges,
-                            final int attemptIdx,
-                            final String containerType,
-                            final Map<String, Readable> logicalTaskIdToReadable) {
+  public ExecutableTask(final String jobId,
+                        final String taskId,
+                        final int attemptIdx,
+                        final String containerType,
+                        final byte[] serializedIRDag,
+                        final List<PhysicalStageEdge> taskIncomingEdges,
+                        final List<PhysicalStageEdge> taskOutgoingEdges,
+                        final Map<String, Readable> irVertexIdToReadable) {
     this.jobId = jobId;
     this.taskId = taskId;
     this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
-    this.taskIncomingEdges = taskIncomingEdges;
-    this.taskOutgoingEdges = taskOutgoingEdges;
     this.attemptIdx = attemptIdx;
     this.containerType = containerType;
-    this.serializedTaskDag = serializedTaskDag;
-    this.logicalTaskIdToReadable = logicalTaskIdToReadable;
+    this.serializedTaskDag = serializedIRDag;
+    this.taskIncomingEdges = taskIncomingEdges;
+    this.taskOutgoingEdges = taskOutgoingEdges;
+    this.irVertexIdToReadable = irVertexIdToReadable;
   }
 
   /**
@@ -84,7 +77,7 @@ public final class ScheduledTask implements Serializable {
   /**
    * @return the serialized DAG of the task.
    */
-  public byte[] getSerializedTaskDag() {
+  public byte[] getSerializedIRDag() {
     return serializedTaskDag;
   }
 
@@ -133,7 +126,7 @@ public final class ScheduledTask implements Serializable {
   /**
    * @return the map between logical task ID and readable.
    */
-  public Map<String, Readable> getLogicalTaskIdToReadable() {
-    return logicalTaskIdToReadable;
+  public Map<String, Readable> getIrVertexIdToReadable() {
+    return irVertexIdToReadable;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java
deleted file mode 100644
index 249adff..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-/**
- * MetricCollectionBarrierTask.
- */
-public final class MetricCollectionBarrierTask extends Task {
-  /**
-   * Constructor.
-   * @param taskId id of the task.
-   * @param irVertexId id of the IR vertex.
-   */
-  MetricCollectionBarrierTask(final String taskId,
-                              final String irVertexId) {
-    super(taskId, irVertexId);
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java
deleted file mode 100644
index 61fa810..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-
-/**
- * OperatorTask.
- */
-public final class OperatorTask extends Task {
-  private final Transform transform;
-
-  /**
-   * Constructor.
-   * @param taskId id of the task.
-   * @param runtimeVertexId id of the runtime vertex.
-   * @param transform transform to perform.
-   */
-  public OperatorTask(final String taskId,
-                      final String runtimeVertexId,
-                      final Transform transform) {
-    super(taskId, runtimeVertexId);
-    this.transform = transform;
-  }
-
-  /**
-   * @return the transform to perform.
-   */
-  public Transform getTransform() {
-    return transform;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
index 85950fa..b25a0ba 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
@@ -25,26 +25,23 @@ import java.util.Map;
  * A job's physical plan, to be executed by the Runtime.
  */
 public final class PhysicalPlan implements Serializable {
-
   private final String id;
-
   private final DAG<PhysicalStage, PhysicalStageEdge> stageDAG;
-
-  private final Map<Task, IRVertex> taskIRVertexMap;
+  private final Map<String, IRVertex> idToIRVertex;
 
   /**
    * Constructor.
    *
    * @param id              ID of the plan.
    * @param stageDAG        the DAG of stages.
-   * @param taskIRVertexMap map from task to IR vertex.
+   * @param idToIRVertex map from task to IR vertex.
    */
   public PhysicalPlan(final String id,
                       final DAG<PhysicalStage, PhysicalStageEdge> stageDAG,
-                      final Map<Task, IRVertex> taskIRVertexMap) {
+                      final Map<String, IRVertex> idToIRVertex) {
     this.id = id;
     this.stageDAG = stageDAG;
-    this.taskIRVertexMap = taskIRVertexMap;
+    this.idToIRVertex = idToIRVertex;
   }
 
   /**
@@ -62,20 +59,10 @@ public final class PhysicalPlan implements Serializable {
   }
 
   /**
-   * Get an IR vertex of the given task.
-   *
-   * @param task task to find the IR vertex of.
-   * @return the corresponding IR vertex of the given task.
-   */
-  public IRVertex getIRVertexOf(final Task task) {
-    return taskIRVertexMap.get(task);
-  }
-
-  /**
    * @return the map from task to IR vertex.
    */
-  public Map<Task, IRVertex> getTaskIRVertexMap() {
-    return taskIRVertexMap;
+  public Map<String, IRVertex> getIdToIRVertex() {
+    return idToIRVertex;
   }
 
   @Override
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
index 2ae7271..27991e3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
@@ -24,7 +24,6 @@ import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.stage.*;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
@@ -70,6 +69,7 @@ public final class PhysicalPlanGenerator
     // for debugging purposes.
     dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan");
     // then create tasks and make it into a physical execution plan.
+
     return stagesIntoPlan(dagOfStages);
   }
 
@@ -197,16 +197,14 @@ public final class PhysicalPlanGenerator
     return dagOfStagesBuilder.build();
   }
 
-  // Map that keeps track of the IRVertex of each tasks
-  private final Map<Task, IRVertex> taskIRVertexMap = new HashMap<>();
+
+  private final Map<String, IRVertex> idToIRVertex = new HashMap<>();
 
   /**
-   * Getter for taskIRVertexMap.
-   *
-   * @return the taskIRVertexMap.
+   * @return the idToIRVertex map.
    */
-  public Map<Task, IRVertex> getTaskIRVertexMap() {
-    return taskIRVertexMap;
+  public Map<String, IRVertex> getIdToIRVertex() {
+    return idToIRVertex;
   }
 
   /**
@@ -221,72 +219,68 @@ public final class PhysicalPlanGenerator
     final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder = new DAGBuilder<>();
 
     for (final Stage stage : dagOfStages.getVertices()) {
-      final Map<IRVertex, Task> irVertexTaskMap = new HashMap<>();
       final List<IRVertex> stageVertices = stage.getStageInternalDAG().getVertices();
 
       final ExecutionPropertyMap firstVertexProperties = stageVertices.iterator().next().getExecutionProperties();
       final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
       final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
 
-      // Only one task DAG will be created and reused.
-      final DAGBuilder<Task, RuntimeEdge<Task>> stageInternalDAGBuilder = new DAGBuilder<>();
-      // Collect split source readables in advance and bind to each scheduled task to avoid extra source split.
-      final List<Map<String, Readable>> logicalTaskIdToReadables = new ArrayList<>(stageParallelism);
+      // Only one DAG will be created and reused.
+      final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
+      // Collect split source readables in advance and bind to each irvertex to avoid extra source split.
+      final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
-        logicalTaskIdToReadables.add(new HashMap<>());
+        vertexIdToReadables.add(new HashMap<>());
       }
 
-      // Iterate over the vertices contained in this stage to convert to tasks.
+      // Iterate over the vertices contained in this stage
       stageVertices.forEach(irVertex -> {
-        final Task newTaskToAdd;
+        if (!(irVertex instanceof  SourceVertex
+            || irVertex instanceof OperatorVertex
+            || irVertex instanceof MetricCollectionBarrierVertex)) {
+          // Sanity check
+          throw new IllegalStateException(irVertex.toString());
+        }
+
         if (irVertex instanceof SourceVertex) {
           final SourceVertex sourceVertex = (SourceVertex) irVertex;
 
+          // Take care of the Readables
           try {
             final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
-            final String irVertexId = sourceVertex.getId();
-            final String logicalTaskId = RuntimeIdGenerator.generateLogicalTaskId(irVertexId);
             for (int i = 0; i < stageParallelism; i++) {
-              logicalTaskIdToReadables.get(i).put(logicalTaskId, readables.get(i));
+              vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
             }
-            newTaskToAdd = new BoundedSourceTask(logicalTaskId, irVertexId);
           } catch (Exception e) {
             throw new PhysicalPlanGenerationException(e);
           }
-        } else if (irVertex instanceof OperatorVertex) {
-          final OperatorVertex operatorVertex = (OperatorVertex) irVertex;
-          final String operatorVertexId = operatorVertex.getId();
-          newTaskToAdd = new OperatorTask(RuntimeIdGenerator.generateLogicalTaskId(operatorVertexId),
-              operatorVertexId, operatorVertex.getTransform());
-
-        } else if (irVertex instanceof MetricCollectionBarrierVertex) {
-          final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-              (MetricCollectionBarrierVertex) irVertex;
-          final String metricVertexId = metricCollectionBarrierVertex.getId();
-          newTaskToAdd = new MetricCollectionBarrierTask(RuntimeIdGenerator.generateLogicalTaskId(metricVertexId),
-              metricVertexId);
-        } else {
-          throw new IllegalVertexOperationException("This vertex type is not supported");
+
+          // Clear internal metadata
+          sourceVertex.clearInternalStates();
         }
-        stageInternalDAGBuilder.addVertex(newTaskToAdd);
-        irVertexTaskMap.put(irVertex, newTaskToAdd);
-        taskIRVertexMap.put(newTaskToAdd, irVertex);
+
+        stageInternalDAGBuilder.addVertex(irVertex);
+        idToIRVertex.put(irVertex.getId(), irVertex);
       });
 
-      // connect internal edges in the task. It suffices to iterate over only the stage internal inEdges.
+      // connect internal edges in the stage. It suffices to iterate over only the stage internal inEdges.
       final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
       stageInternalDAG.getVertices().forEach(irVertex -> {
         final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
         inEdges.forEach(edge ->
-            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(edge.getId(), edge.getExecutionProperties(),
-                irVertexTaskMap.get(edge.getSrc()), irVertexTaskMap.get(edge.getDst()),
-                edge.getCoder(), edge.isSideInput())));
+            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
+                edge.getId(),
+                edge.getExecutionProperties(),
+                edge.getSrc(),
+                edge.getDst(),
+                edge.getCoder(),
+                edge.isSideInput())));
       });
 
-      // Create the task to add for this stage.
+      // Create the physical stage.
       final PhysicalStage physicalStage =
-          new PhysicalStage(stage.getId(), stageInternalDAGBuilder.build(),
-              stageParallelism, stage.getScheduleGroupIndex(), containerType, logicalTaskIdToReadables);
+          new PhysicalStage(stage.getId(), stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
+              stageParallelism, stage.getScheduleGroupIndex(), containerType, vertexIdToReadables);
 
       physicalDAGBuilder.addVertex(physicalStage);
       runtimeStageIdToPhysicalStageMap.put(stage.getId(), physicalStage);
@@ -300,7 +294,8 @@ public final class PhysicalPlanGenerator
 
           physicalDAGBuilder.connectVertices(new PhysicalStageEdge(stageEdge.getId(),
               stageEdge.getExecutionProperties(),
-              stageEdge.getSrcVertex(), stageEdge.getDstVertex(),
+              stageEdge.getSrcVertex(),
+              stageEdge.getDstVertex(),
               srcStage, dstStage,
               stageEdge.getCoder(),
               stageEdge.isSideInput()));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
index ded0a8e..a58cfba 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.runtime.common.plan.physical;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.commons.lang3.SerializationUtils;
@@ -30,50 +31,50 @@ import java.util.Map;
  * PhysicalStage.
  */
 public final class PhysicalStage extends Vertex {
-  private final DAG<Task, RuntimeEdge<Task>> taskDag;
+  private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
   private final int parallelism;
   private final int scheduleGroupIndex;
   private final String containerType;
-  private final byte[] serializedTaskDag;
-  private final List<Map<String, Readable>> logicalTaskIdToReadables;
+  private final byte[] serializedIRDag;
+  private final List<Map<String, Readable>> vertexIdToReadables;
 
   /**
    * Constructor.
    *
-   * @param stageId                  ID of the stage.
-   * @param taskDag                  the DAG of the task in this stage.
-   * @param parallelism              how many tasks will be executed in this stage.
-   * @param scheduleGroupIndex       the schedule group index.
-   * @param containerType            the type of container to execute the task on.
-   * @param logicalTaskIdToReadables the list of maps between logical task ID and {@link Readable}.
+   * @param stageId             ID of the stage.
+   * @param irDag               the DAG of the task in this stage.
+   * @param parallelism         how many tasks will be executed in this stage.
+   * @param scheduleGroupIndex  the schedule group index.
+   * @param containerType       the type of container to execute the task on.
+   * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
    */
   public PhysicalStage(final String stageId,
-                       final DAG<Task, RuntimeEdge<Task>> taskDag,
+                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
                        final int parallelism,
                        final int scheduleGroupIndex,
                        final String containerType,
-                       final List<Map<String, Readable>> logicalTaskIdToReadables) {
+                       final List<Map<String, Readable>> vertexIdToReadables) {
     super(stageId);
-    this.taskDag = taskDag;
+    this.irDag = irDag;
     this.parallelism = parallelism;
     this.scheduleGroupIndex = scheduleGroupIndex;
     this.containerType = containerType;
-    this.serializedTaskDag = SerializationUtils.serialize(taskDag);
-    this.logicalTaskIdToReadables = logicalTaskIdToReadables;
+    this.serializedIRDag = SerializationUtils.serialize(irDag);
+    this.vertexIdToReadables = vertexIdToReadables;
   }
 
   /**
-   * @return the task.
+   * @return the IRVertex DAG.
    */
-  public DAG<Task, RuntimeEdge<Task>> getTaskDag() {
-    return taskDag;
+  public DAG<IRVertex, RuntimeEdge<IRVertex>> getIRDAG() {
+    return irDag;
   }
 
   /**
    * @return the serialized DAG of the task.
    */
-  public byte[] getSerializedTaskDag() {
-    return serializedTaskDag;
+  public byte[] getSerializedIRDAG() {
+    return serializedIRDag;
   }
 
   /**
@@ -102,17 +103,17 @@ public final class PhysicalStage extends Vertex {
   }
 
   /**
-   * @return the list of maps between logical task ID and readable.
+   * @return the list of maps between vertex ID and readables.
    */
-  public List<Map<String, Readable>> getLogicalTaskIdToReadables() {
-    return logicalTaskIdToReadables;
+  public List<Map<String, Readable>> getVertexIdToReadables() {
+    return vertexIdToReadables;
   }
 
   @Override
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
-    sb.append(", \"taskDag\": ").append(taskDag);
+    sb.append(", \"irDag\": ").append(irDag);
     sb.append(", \"parallelism\": ").append(parallelism);
     sb.append(", \"containerType\": \"").append(containerType).append("\"");
     sb.append('}');
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
deleted file mode 100644
index 8d241e1..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.dag.Vertex;
-
-/**
- * Task.
- * The index value is identical to the Task's index it belongs to.
- */
-public abstract class Task extends Vertex {
-  private final String irVertexId;
-
-  /**
-   * Constructor.
-   *
-   * @param taskId      id of the task.
-   * @param irVertexId  id for the IR vertex.
-   */
-  public Task(final String taskId,
-              final String irVertexId) {
-    super(taskId);
-    this.irVertexId = irVertexId;
-  }
-
-  /**
-   * @return the id of the runtime vertex.
-   */
-  public final String getIrVertexId() {
-    return irVertexId;
-  }
-
-  @Override
-  public final String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{\"taskId\": \"").append(getId()).append("\"}");
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java
deleted file mode 100644
index 51f0d31..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-/**
- * UnboundedSourceTask.
- */
-public final class UnboundedSourceTask extends Task {
-  /**
-   * Constructor.
-   * @param taskId the id of the task.
-   * @param runtimeVertexId id of the runtime vertex.
-   */
-  public UnboundedSourceTask(final String taskId,
-                             final String runtimeVertexId) {
-    super(taskId, runtimeVertexId);
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index 08a9a66..31f56d5 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -18,8 +18,7 @@ package edu.snu.nemo.runtime.common.state;
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
+ * Represents the states and their transitions of a task.
  */
 public final class TaskState {
   private final StateMachine stateMachine;
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index c266bab..343b8e5 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -62,7 +62,7 @@ message TaskStateChangedMsg {
     required string executorId = 1;
     required string taskId = 2;
     required TaskStateFromExecutor state = 3;
-    optional string taskPutOnHoldId = 4;
+    optional string vertexPutOnHoldId = 4;
     optional RecoverableFailureCause failureCause = 5;
     required int32 attemptIdx = 6;
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8fd5281..7a7b523 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.executor;
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.IllegalMessageException;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
@@ -27,8 +28,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import org.apache.commons.lang3.SerializationUtils;
@@ -87,35 +87,34 @@ public final class Executor {
     return executorId;
   }
 
-  private synchronized void onTaskReceived(final ScheduledTask scheduledTask) {
+  private synchronized void onTaskReceived(final ExecutableTask executableTask) {
     LOG.debug("Executor [{}] received Task [{}] to execute.",
-        new Object[]{executorId, scheduledTask.getTaskId()});
-    executorService.execute(() -> launchTask(scheduledTask));
+        new Object[]{executorId, executableTask.getTaskId()});
+    executorService.execute(() -> launchTask(executableTask));
   }
 
   /**
    * Launches the Task, and keeps track of the execution state with taskStateManager.
-   * @param scheduledTask to launch.
+   * @param executableTask to launch.
    */
-  private void launchTask(final ScheduledTask scheduledTask) {
+  private void launchTask(final ExecutableTask executableTask) {
     try {
-      final DAG<Task, RuntimeEdge<Task>> taskDag =
-          SerializationUtils.deserialize(scheduledTask.getSerializedTaskDag());
+      final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
+          SerializationUtils.deserialize(executableTask.getSerializedIRDag());
       final TaskStateManager taskStateManager =
-          new TaskStateManager(scheduledTask, taskDag, executorId,
-              persistentConnectionToMasterMap, metricMessageSender);
+          new TaskStateManager(executableTask, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
-      scheduledTask.getTaskIncomingEdges()
+      executableTask.getTaskIncomingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      scheduledTask.getTaskOutgoingEdges()
+      executableTask.getTaskOutgoingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      taskDag.getVertices().forEach(v -> {
-        taskDag.getOutgoingEdgesOf(v)
+      irDag.getVertices().forEach(v -> {
+        irDag.getOutgoingEdgesOf(v)
             .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
       });
 
       new TaskExecutor(
-          scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+          executableTask, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
@@ -150,9 +149,9 @@ public final class Executor {
       switch (message.getType()) {
       case ScheduleTask:
         final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
-        final ScheduledTask scheduledTask =
+        final ExecutableTask executableTask =
             SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
-        onTaskReceived(scheduledTask);
+        onTaskReceived(executableTask);
         break;
       default:
         throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index c68215a..2349ab7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -21,9 +21,8 @@ import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
@@ -41,79 +40,88 @@ import org.slf4j.LoggerFactory;
  * Executes a task.
  */
 public final class TaskExecutor {
+  // Static variables
   private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
+  private static final String ITERATORID_PREFIX = "ITERATOR_";
+  private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
 
-  private final DAG<Task, RuntimeEdge<Task>> taskDag;
+  // From ExecutableTask
+  private final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag;
   private final String taskId;
   private final int taskIdx;
   private final TaskStateManager taskStateManager;
   private final List<PhysicalStageEdge> stageIncomingEdges;
   private final List<PhysicalStageEdge> stageOutgoingEdges;
+  private Map<String, Readable> irVertexIdToReadable;
+
+  // Other parameters
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  private final List<InputReader> inputReaders;
-  private final Map<InputReader, List<TaskDataHandler>> inputReaderToDataHandlersMap;
+  // Data structures
+  private final Map<InputReader, List<IRVertexDataHandler>> inputReaderToDataHandlersMap;
   private final Map<String, Iterator> idToSrcIteratorMap;
-  private final Map<String, List<TaskDataHandler>> srcIteratorIdToDataHandlersMap;
-  private final Map<String, List<TaskDataHandler>> iteratorIdToDataHandlersMap;
+  private final Map<String, List<IRVertexDataHandler>> srcIteratorIdToDataHandlersMap;
+  private final Map<String, List<IRVertexDataHandler>> iteratorIdToDataHandlersMap;
   private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>> partitionQueue;
-  private List<TaskDataHandler> taskDataHandlers;
-  private Map<OutputCollectorImpl, List<TaskDataHandler>> outputToChildrenDataHandlersMap;
-  private final Set<String> finishedTaskIds;
-  private int numPartitions;
-  private Map<String, Readable> logicalTaskIdToReadable;
+  private List<IRVertexDataHandler> irVertexDataHandlers;
+  private Map<OutputCollectorImpl, List<IRVertexDataHandler>> outputToChildrenDataHandlersMap;
+  private final Set<String> finishedVertexIds;
 
   // For metrics
   private long serBlockSize;
   private long encodedBlockSize;
 
-  private boolean isExecutionRequested;
-  private String logicalTaskIdPutOnHold;
+  // Misc
+  private boolean isExecuted;
+  private String irVertexIdPutOnHold;
+  private int numPartitions;
 
-  private static final String ITERATORID_PREFIX = "ITERATOR_";
-  private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
 
   /**
    * Constructor.
-   * @param scheduledTask Task with information needed during execution.
-   * @param taskDag A DAG of Tasks.
+   * @param executableTask Task with information needed during execution.
+   * @param irVertexDag A DAG of vertices.
    * @param taskStateManager State manager for this Task.
    * @param channelFactory For reading from/writing to data to other Stages.
    * @param metricMessageSender For sending metric with execution stats to Master.
    */
-  public TaskExecutor(final ScheduledTask scheduledTask,
-                      final DAG<Task, RuntimeEdge<Task>> taskDag,
+  public TaskExecutor(final ExecutableTask executableTask,
+                      final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
                       final DataTransferFactory channelFactory,
                       final MetricMessageSender metricMessageSender) {
-    this.taskDag = taskDag;
-    this.taskId = scheduledTask.getTaskId();
-    this.taskIdx = scheduledTask.getTaskIdx();
+    // Information from the ExecutableTask.
+    this.irVertexDag = irVertexDag;
+    this.taskId = executableTask.getTaskId();
+    this.taskIdx = executableTask.getTaskIdx();
+    this.stageIncomingEdges = executableTask.getTaskIncomingEdges();
+    this.stageOutgoingEdges = executableTask.getTaskOutgoingEdges();
+    this.irVertexIdToReadable = executableTask.getIrVertexIdToReadable();
+
+    // Other parameters.
     this.taskStateManager = taskStateManager;
-    this.stageIncomingEdges = scheduledTask.getTaskIncomingEdges();
-    this.stageOutgoingEdges = scheduledTask.getTaskOutgoingEdges();
-    this.logicalTaskIdToReadable = scheduledTask.getLogicalTaskIdToReadable();
     this.channelFactory = channelFactory;
     this.metricCollector = new MetricCollector(metricMessageSender);
-    this.logicalTaskIdPutOnHold = null;
-    this.isExecutionRequested = false;
 
-    this.inputReaders = new ArrayList<>();
+    // Initialize data structures.
     this.inputReaderToDataHandlersMap = new ConcurrentHashMap<>();
     this.idToSrcIteratorMap = new HashMap<>();
     this.srcIteratorIdToDataHandlersMap = new HashMap<>();
     this.iteratorIdToDataHandlersMap = new ConcurrentHashMap<>();
     this.partitionQueue = new LinkedBlockingQueue<>();
     this.outputToChildrenDataHandlersMap = new HashMap<>();
-    this.taskDataHandlers = new ArrayList<>();
-
-    this.finishedTaskIds = new HashSet<>();
-    this.numPartitions = 0;
+    this.irVertexDataHandlers = new ArrayList<>();
+    this.finishedVertexIds = new HashSet<>();
 
+    // Metrics
     this.serBlockSize = 0;
     this.encodedBlockSize = 0;
 
+    // Misc
+    this.isExecuted = false;
+    this.irVertexIdPutOnHold = null;
+    this.numPartitions = 0;
 
     initialize();
   }
@@ -124,29 +132,23 @@ public final class TaskExecutor {
    * 2) Prepares Transforms if needed.
    */
   private void initialize() {
-    // Initialize data read of SourceVertex.
-    taskDag.getTopologicalSort().stream()
-        .filter(task -> task instanceof BoundedSourceTask)
-        .forEach(boundedSourceTask -> ((BoundedSourceTask) boundedSourceTask).setReadable(
-            logicalTaskIdToReadable.get(boundedSourceTask.getId())));
-
-    // Initialize data handlers for each Task.
-    taskDag.topologicalDo(task -> taskDataHandlers.add(new TaskDataHandler(task)));
+    // Initialize data handlers for each IRVertex.
+    irVertexDag.topologicalDo(irVertex -> irVertexDataHandlers.add(new IRVertexDataHandler(irVertex)));
 
     // Initialize data transfer.
-    // Construct a pointer-based DAG of TaskDataHandlers that are used for data transfer.
+    // Construct a pointer-based DAG of irVertexDataHandlers that are used for data transfer.
     // 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
     // to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
-    taskDag.topologicalDo(task -> {
-      final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(task);
-      final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(task);
-      final TaskDataHandler dataHandler = getTaskDataHandler(task);
-
-      // Set data handlers of children tasks.
-      // This forms a pointer-based DAG of TaskDataHandlers.
-      final List<TaskDataHandler> childrenDataHandlers = new ArrayList<>();
-      taskDag.getChildren(task.getId()).forEach(child ->
-          childrenDataHandlers.add(getTaskDataHandler(child)));
+    irVertexDag.topologicalDo(irVertex -> {
+      final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
+      final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
+      final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
+
+      // Set data handlers of children irVertices.
+      // This forms a pointer-based DAG of irVertexDataHandlers.
+      final List<IRVertexDataHandler> childrenDataHandlers = new ArrayList<>();
+      irVertexDag.getChildren(irVertex.getId()).forEach(child ->
+          childrenDataHandlers.add(getIRVertexDataHandler(child)));
       dataHandler.setChildrenDataHandler(childrenDataHandlers);
 
       // Add InputReaders for inter-stage data transfer
@@ -158,7 +160,6 @@ public final class TaskExecutor {
         if (inputReader.isSideInputReader()) {
           dataHandler.addSideInputFromOtherStages(inputReader);
         } else {
-          inputReaders.add(inputReader);
           inputReaderToDataHandlersMap.putIfAbsent(inputReader, new ArrayList<>());
           inputReaderToDataHandlersMap.get(inputReader).add(dataHandler);
         }
@@ -167,29 +168,29 @@ public final class TaskExecutor {
       // Add OutputWriters for inter-stage data transfer
       outEdgesToOtherStages.forEach(physicalStageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
-            task, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+            irVertex, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
         dataHandler.addOutputWriter(outputWriter);
       });
 
       // Add InputPipes for intra-stage data transfer
-      addInputFromThisStage(task, dataHandler);
+      addInputFromThisStage(irVertex, dataHandler);
 
       // Add OutputPipe for intra-stage data transfer
-      setOutputCollector(task, dataHandler);
+      setOutputCollector(irVertex, dataHandler);
     });
 
     // Prepare Transforms if needed.
-    taskDag.topologicalDo(task -> {
-      if (task instanceof OperatorTask) {
-        final Transform transform = ((OperatorTask) task).getTransform();
+    irVertexDag.topologicalDo(irVertex -> {
+      if (irVertex instanceof OperatorVertex) {
+        final Transform transform = ((OperatorVertex) irVertex).getTransform();
         final Map<Transform, Object> sideInputMap = new HashMap<>();
-        final TaskDataHandler dataHandler = getTaskDataHandler(task);
+        final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
         // Check and collect side inputs.
         if (!dataHandler.getSideInputFromOtherStages().isEmpty()) {
-          sideInputFromOtherStages(task, sideInputMap);
+          sideInputFromOtherStages(irVertex, sideInputMap);
         }
         if (!dataHandler.getSideInputFromThisStage().isEmpty()) {
-          sideInputFromThisStage(task, sideInputMap);
+          sideInputFromThisStage(irVertex, sideInputMap);
         }
 
         final Transform.Context transformContext = new ContextImpl(sideInputMap);
@@ -200,43 +201,41 @@ public final class TaskExecutor {
   }
 
   /**
-   * Collect all inter-stage incoming edges of this task.
+   * Collect all inter-stage incoming edges of this vertex.
    *
-   * @param task the Task whose inter-stage incoming edges to be collected.
+   * @param irVertex the IRVertex whose inter-stage incoming edges to be collected.
    * @return the collected incoming edges.
    */
-  private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final Task task) {
+  private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
     return stageIncomingEdges.stream().filter(
-        stageInEdge -> stageInEdge.getDstVertex().getId().equals(task.getIrVertexId()))
+        stageInEdge -> stageInEdge.getDstVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
   }
 
   /**
-   * Collect all inter-stage outgoing edges of this task.
+   * Collect all inter-stage outgoing edges of this vertex.
    *
-   * @param task the Task whose inter-stage outgoing edges to be collected.
+   * @param irVertex the IRVertex whose inter-stage outgoing edges to be collected.
    * @return the collected outgoing edges.
    */
-  private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final Task task) {
+  private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
     return stageOutgoingEdges.stream().filter(
-        stageInEdge -> stageInEdge.getSrcVertex().getId().equals(task.getIrVertexId()))
+        stageInEdge -> stageInEdge.getSrcVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
   }
 
   /**
-   * Add input OutputCollectors to each {@link Task}.
-   * Input OutputCollector denotes all the OutputCollectors of intra-Stage parent tasks of this task.
+   * Add input OutputCollectors to each {@link IRVertex}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage dependencies.
    *
-   * @param task the Task to add input OutputCollectors to.
+   * @param irVertex the IRVertex to add input OutputCollectors to.
    */
-  private void addInputFromThisStage(final Task task, final TaskDataHandler dataHandler) {
-    List<Task> parentTasks = taskDag.getParents(task.getId());
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-
-    if (parentTasks != null) {
-      parentTasks.forEach(parent -> {
-        final OutputCollectorImpl parentOutputCollector = getTaskDataHandler(parent).getOutputCollector();
-        if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  private void addInputFromThisStage(final IRVertex irVertex, final IRVertexDataHandler dataHandler) {
+    List<IRVertex> parentVertices = irVertexDag.getParents(irVertex.getId());
+    if (parentVertices != null) {
+      parentVertices.forEach(parent -> {
+        final OutputCollectorImpl parentOutputCollector = getIRVertexDataHandler(parent).getOutputCollector();
+        if (parentOutputCollector.hasSideInputFor(irVertex.getId())) {
           dataHandler.addSideInputFromThisStage(parentOutputCollector);
         } else {
           dataHandler.addInputFromThisStages(parentOutputCollector);
@@ -246,21 +245,15 @@ public final class TaskExecutor {
   }
 
   /**
-   * Add output outputCollectors to each {@link Task}.
-   * Output outputCollector denotes the one and only one outputCollector of this task.
-   * Check the outgoing edges that will use this outputCollector,
-   * and set this outputCollector as side input if any one of the edges uses this outputCollector as side input.
-   *
-   * @param task the Task to add output outputCollectors to.
+   * Add outputCollectors to each {@link IRVertex}.
+   * @param irVertex the IRVertex to add output outputCollectors to.
    */
-  private void setOutputCollector(final Task task, final TaskDataHandler dataHandler) {
+  private void setOutputCollector(final IRVertex irVertex, final IRVertexDataHandler dataHandler) {
     final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-
-    taskDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
+    irVertexDag.getOutgoingEdgesOf(irVertex).forEach(outEdge -> {
       if (outEdge.isSideInput()) {
         outputCollector.setSideInputRuntimeEdge(outEdge);
-        outputCollector.setAsSideInputFor(physicalTaskId);
+        outputCollector.setAsSideInputFor(irVertex.getId());
       }
     });
 
@@ -268,25 +261,17 @@ public final class TaskExecutor {
   }
 
   /**
-   * Check that this task has OutputWriter for inter-stage data.
+   * Check that this irVertex has OutputWriter for inter-stage data.
    *
-   * @param task the task to check whether it has OutputWriters.
-   * @return true if the task has OutputWriters.
+   * @param irVertex the irVertex to check whether it has OutputWriters.
+   * @return true if the irVertex has OutputWriters.
    */
-  private boolean hasOutputWriter(final Task task) {
-    return !getTaskDataHandler(task).getOutputWriters().isEmpty();
+  private boolean hasOutputWriter(final IRVertex irVertex) {
+    return !getIRVertexDataHandler(irVertex).getOutputWriters().isEmpty();
   }
 
-  /**
-   * If the given task is MetricCollectionBarrierTask,
-   * set task as put on hold and use it to decide Task state when Task finishes.
-   *
-   * @param task the task to check whether it has OutputWriters.
-   * @return true if the task has OutputWriters.
-   */
-  private void setTaskPutOnHold(final MetricCollectionBarrierTask task) {
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-    logicalTaskIdPutOnHold = RuntimeIdGenerator.getLogicalTaskIdIdFromPhysicalTaskId(physicalTaskId);
+  private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex) {
+    irVertexIdPutOnHold = irVertex.getId();
   }
 
   /**
@@ -294,16 +279,15 @@ public final class TaskExecutor {
    * As element-wise output write is done and the block is in memory,
    * flush the block into the designated data store and commit it.
    *
-   * @param task the task with OutputWriter to flush and commit output block.
+   * @param irVertex the IRVertex with OutputWriter to flush and commit output block.
    */
-  private void writeAndCloseOutputWriters(final Task task) {
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
+  private void writeAndCloseOutputWriters(final IRVertex irVertex) {
     final List<Long> writtenBytesList = new ArrayList<>();
     final Map<String, Object> metric = new HashMap<>();
-    metricCollector.beginMeasurement(physicalTaskId, metric);
+    metricCollector.beginMeasurement(irVertex.getId(), metric);
     final long writeStartTime = System.currentTimeMillis();
 
-    getTaskDataHandler(task).getOutputWriters().forEach(outputWriter -> {
+    getIRVertexDataHandler(irVertex).getOutputWriters().forEach(outputWriter -> {
       outputWriter.close();
       final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
       writtenBytes.ifPresent(writtenBytesList::add);
@@ -312,21 +296,25 @@ public final class TaskExecutor {
     final long writeEndTime = System.currentTimeMillis();
     metric.put("OutputWriteTime(ms)", writeEndTime - writeStartTime);
     putWrittenBytesMetric(writtenBytesList, metric);
-    metricCollector.endMeasurement(physicalTaskId, metric);
+    metricCollector.endMeasurement(irVertex.getId(), metric);
   }
 
   /**
    * Get input iterator from BoundedSource and bind it with id.
    */
   private void prepareInputFromSource() {
-    taskDag.topologicalDo(task -> {
-      if (task instanceof BoundedSourceTask) {
+    irVertexDag.topologicalDo(irVertex -> {
+      if (irVertex instanceof SourceVertex) {
         try {
           final String iteratorId = generateIteratorId();
-          final Iterator iterator = ((BoundedSourceTask) task).getReadable().read().iterator();
+          final Readable readable = irVertexIdToReadable.get(irVertex.getId());
+          if (readable == null) {
+            throw new RuntimeException(irVertex.toString());
+          }
+          final Iterator iterator = readable.read().iterator();
           idToSrcIteratorMap.putIfAbsent(iteratorId, iterator);
           srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new ArrayList<>());
-          srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
+          srcIteratorIdToDataHandlersMap.get(iteratorId).add(getIRVertexDataHandler(irVertex));
         } catch (final BlockFetchException ex) {
           taskStateManager.onTaskStateChanged(TaskState.State.FAILED_RECOVERABLE,
               Optional.empty(), Optional.of(TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
@@ -374,22 +362,21 @@ public final class TaskExecutor {
   }
 
   /**
-   * Check whether all tasks in this Task are finished.
+   * Check whether all vertices in this Task are finished.
    *
-   * @return true if all tasks are finished.
+   * @return true if all vertices are finished.
    */
-  private boolean finishedAllTasks() {
+  private boolean finishedAllVertices() {
     // Total number of Tasks
-    int taskNum = taskDataHandlers.size();
-    int finishedTaskNum = finishedTaskIds.size();
-
-    return finishedTaskNum == taskNum;
+    int vertexNum = irVertexDataHandlers.size();
+    int finishedVertexNum = finishedVertexIds.size();
+    return finishedVertexNum == vertexNum;
   }
 
   /**
-   * Initialize the very first map of OutputCollector-children task DAG.
+   * Initialize the very first map of OutputCollector-children irVertex DAG.
    * In each map entry, the OutputCollector contains input data to be propagated through
-   * the children task DAG.
+   * the children irVertex DAG.
    */
   private void initializeOutputToChildrenDataHandlersMap() {
     srcIteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
@@ -403,11 +390,11 @@ public final class TaskExecutor {
   }
 
   /**
-   * Update the map of OutputCollector-children task DAG.
+   * Update the map of OutputCollector-children irVertex DAG.
    */
   private void updateOutputToChildrenDataHandlersMap() {
-    Map<OutputCollectorImpl, List<TaskDataHandler>> currentMap = outputToChildrenDataHandlersMap;
-    Map<OutputCollectorImpl, List<TaskDataHandler>> updatedMap = new HashMap<>();
+    Map<OutputCollectorImpl, List<IRVertexDataHandler>> currentMap = outputToChildrenDataHandlersMap;
+    Map<OutputCollectorImpl, List<IRVertexDataHandler>> updatedMap = new HashMap<>();
 
     currentMap.values().forEach(dataHandlers ->
         dataHandlers.forEach(dataHandler -> {
@@ -419,13 +406,13 @@ public final class TaskExecutor {
   }
 
   /**
-   * Update the map of OutputCollector-children task DAG.
+   * Update the map of OutputCollector-children irVertex DAG.
    *
-   * @param task the Task with the transform to close.
+   * @param irVertex the IRVertex with the transform to close.
    */
-  private void closeTransform(final Task task) {
-    if (task instanceof OperatorTask) {
-      Transform transform = ((OperatorTask) task).getTransform();
+  private void closeTransform(final IRVertex irVertex) {
+    if (irVertex instanceof OperatorVertex) {
+      Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.close();
     }
   }
@@ -434,11 +421,11 @@ public final class TaskExecutor {
    * As a preprocessing of side input data, get inter stage side input
    * and form a map of source transform-side input.
    *
-   * @param task the task which receives side input from other stages.
+   * @param irVertex the IRVertex which receives side input from other stages.
    * @param sideInputMap the map of source transform-side input to build.
    */
-  private void sideInputFromOtherStages(final Task task, final Map<Transform, Object> sideInputMap) {
-    getTaskDataHandler(task).getSideInputFromOtherStages().forEach(sideInputReader -> {
+  private void sideInputFromOtherStages(final IRVertex irVertex, final Map<Transform, Object> sideInputMap) {
+    getIRVertexDataHandler(irVertex).getSideInputFromOtherStages().forEach(sideInputReader -> {
       try {
         final DataUtil.IteratorWithNumBytes sideInputIterator = sideInputReader.read().get(0).get();
         final Object sideInput = getSideInput(sideInputIterator);
@@ -447,7 +434,7 @@ public final class TaskExecutor {
         if (inEdge instanceof PhysicalStageEdge) {
           srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
         } else {
-          srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
+          srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
         }
         sideInputMap.put(srcTransform, sideInput);
 
@@ -477,11 +464,11 @@ public final class TaskExecutor {
    * Assumption:  intra stage side input denotes a data element initially received
    *              via side input reader from other stages.
    *
-   * @param task the task which receives the data element marked as side input.
+   * @param irVertex the IRVertex which receives the data element marked as side input.
    * @param sideInputMap the map of source transform-side input to build.
    */
-  private void sideInputFromThisStage(final Task task, final Map<Transform, Object> sideInputMap) {
-    getTaskDataHandler(task).getSideInputFromThisStage().forEach(input -> {
+  private void sideInputFromThisStage(final IRVertex irVertex, final Map<Transform, Object> sideInputMap) {
+    getIRVertexDataHandler(irVertex).getSideInputFromThisStage().forEach(input -> {
       // because sideInput is only 1 element in the outputCollector
       Object sideInput = input.remove();
       final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
@@ -489,7 +476,7 @@ public final class TaskExecutor {
       if (inEdge instanceof PhysicalStageEdge) {
         srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
       } else {
-        srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
+        srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
       }
       sideInputMap.put(srcTransform, sideInput);
     });
@@ -505,11 +492,10 @@ public final class TaskExecutor {
     long boundedSrcReadEndTime = 0;
     long inputReadStartTime = 0;
     long inputReadEndTime = 0;
-    if (isExecutionRequested) {
+    if (isExecuted) {
       throw new RuntimeException("Task {" + taskId + "} execution called again!");
-    } else {
-      isExecutionRequested = true;
     }
+    isExecuted = true;
     taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
     LOG.info("{} Executing!", taskId);
 
@@ -523,12 +509,12 @@ public final class TaskExecutor {
     inputReadStartTime = System.currentTimeMillis();
     prepareInputFromOtherStages();
 
-    // Execute the Task DAG.
+    // Execute the IRVertex DAG.
     try {
       srcIteratorIdToDataHandlersMap.forEach((srcIteratorId, dataHandlers) -> {
         Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
         iterator.forEachRemaining(element -> {
-          for (final TaskDataHandler dataHandler : dataHandlers) {
+          for (final IRVertexDataHandler dataHandler : dataHandlers) {
             runTask(dataHandler, element);
           }
         });
@@ -539,9 +525,9 @@ public final class TaskExecutor {
         Pair<String, DataUtil.IteratorWithNumBytes> idToIteratorPair = partitionQueue.take();
         final String iteratorId = idToIteratorPair.left();
         final DataUtil.IteratorWithNumBytes iterator = idToIteratorPair.right();
-        List<TaskDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
+        List<IRVertexDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
         iterator.forEachRemaining(element -> {
-          for (final TaskDataHandler dataHandler : dataHandlers) {
+          for (final IRVertexDataHandler dataHandler : dataHandlers) {
             runTask(dataHandler, element);
           }
         });
@@ -566,44 +552,44 @@ public final class TaskExecutor {
       metric.put("InputReadTime(ms)", inputReadEndTime - inputReadStartTime);
 
       // Process intra-Task data.
-      // Intra-Task data comes from outputCollectors of this Task's Tasks.
+      // Intra-Task data comes from outputCollectors of this Task's vertices.
       initializeOutputToChildrenDataHandlersMap();
-      while (!finishedAllTasks()) {
+      while (!finishedAllVertices()) {
         outputToChildrenDataHandlersMap.forEach((outputCollector, childrenDataHandlers) -> {
-          // Get the task that has this outputCollector as its output outputCollector
-          Task outputCollectorOwnerTask = taskDataHandlers.stream()
+          // Get the vertex that has this outputCollector as its output outputCollector
+          final IRVertex outputProducer = irVertexDataHandlers.stream()
               .filter(dataHandler -> dataHandler.getOutputCollector() == outputCollector)
-              .findFirst().get().getTask();
+              .findFirst().get().getIRVertex();
 
-          // Before consuming the output of outputCollectorOwnerTask as input,
+          // Before consuming the output of outputProducer as input,
           // close transform if it is OperatorTransform.
-          closeTransform(outputCollectorOwnerTask);
+          closeTransform(outputProducer);
 
-          // Set outputCollectorOwnerTask as finished.
-          finishedTaskIds.add(getPhysicalTaskId(outputCollectorOwnerTask.getId()));
+          // Set outputProducer as finished.
+          finishedVertexIds.add(outputProducer.getId());
 
           while (!outputCollector.isEmpty()) {
             final Object element = outputCollector.remove();
 
-            // Pass outputCollectorOwnerTask's output to its children tasks recursively.
+            // Pass outputProducer's output to its children tasks recursively.
             if (!childrenDataHandlers.isEmpty()) {
-              for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+              for (final IRVertexDataHandler childDataHandler : childrenDataHandlers) {
                 runTask(childDataHandler, element);
               }
             }
 
             // Write element-wise to OutputWriters if any and close the OutputWriters.
-            if (hasOutputWriter(outputCollectorOwnerTask)) {
+            if (hasOutputWriter(outputProducer)) {
               // If outputCollector isn't empty(if closeTransform produced some output),
               // write them element-wise to OutputWriters.
               List<OutputWriter> outputWritersOfTask =
-                  getTaskDataHandler(outputCollectorOwnerTask).getOutputWriters();
+                  getIRVertexDataHandler(outputProducer).getOutputWriters();
               outputWritersOfTask.forEach(outputWriter -> outputWriter.write(element));
             }
           }
 
-          if (hasOutputWriter(outputCollectorOwnerTask)) {
-            writeAndCloseOutputWriters(outputCollectorOwnerTask);
+          if (hasOutputWriter(outputProducer)) {
+            writeAndCloseOutputWriters(outputProducer);
           }
         });
         updateOutputToChildrenDataHandlersMap();
@@ -625,47 +611,47 @@ public final class TaskExecutor {
     final boolean available = serBlockSize >= 0;
     putReadBytesMetric(available, serBlockSize, encodedBlockSize, metric);
     metricCollector.endMeasurement(taskId, metric);
-    if (logicalTaskIdPutOnHold == null) {
+    if (irVertexIdPutOnHold == null) {
       taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
     } else {
       taskStateManager.onTaskStateChanged(TaskState.State.ON_HOLD,
-          Optional.of(logicalTaskIdPutOnHold),
+          Optional.of(irVertexIdPutOnHold),
           Optional.empty());
     }
     LOG.info("{} Complete!", taskId);
   }
 
   /**
-   * Recursively executes a task with the input data element.
+   * Recursively executes a vertex with the input data element.
    *
-   * @param dataHandler TaskDataHandler of a task to execute.
+   * @param dataHandler IRVertexDataHandler of a vertex to execute.
    * @param dataElement input data element to process.
    */
-  private void runTask(final TaskDataHandler dataHandler, final Object dataElement) {
-    final Task task = dataHandler.getTask();
+  private void runTask(final IRVertexDataHandler dataHandler, final Object dataElement) {
+    final IRVertex irVertex = dataHandler.getIRVertex();
     final OutputCollectorImpl outputCollector = dataHandler.getOutputCollector();
 
-    // Process element-wise depending on the Task type
-    if (task instanceof BoundedSourceTask) {
+    // Process element-wise depending on the vertex type
+    if (irVertex instanceof SourceVertex) {
       if (dataElement == null) { // null used for Beam VoidCoders
         final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
         outputCollector.emit(nullForVoidCoder);
       } else {
         outputCollector.emit(dataElement);
       }
-    } else if (task instanceof OperatorTask) {
-      final Transform transform = ((OperatorTask) task).getTransform();
+    } else if (irVertex instanceof OperatorVertex) {
+      final Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.onData(dataElement);
-    } else if (task instanceof MetricCollectionBarrierTask) {
+    } else if (irVertex instanceof MetricCollectionBarrierVertex) {
       if (dataElement == null) { // null used for Beam VoidCoders
         final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
         outputCollector.emit(nullForVoidCoder);
       } else {
         outputCollector.emit(dataElement);
       }
-      setTaskPutOnHold((MetricCollectionBarrierTask) task);
+      setIRVertexPutOnHold((MetricCollectionBarrierVertex) irVertex);
     } else {
-      throw new UnsupportedOperationException("This type  of Task is not supported");
+      throw new UnsupportedOperationException("This type of IRVertex is not supported");
     }
 
     // For the produced output
@@ -673,15 +659,15 @@ public final class TaskExecutor {
       final Object element = outputCollector.remove();
 
       // Pass output to its children recursively.
-      List<TaskDataHandler> childrenDataHandlers = dataHandler.getChildren();
+      List<IRVertexDataHandler> childrenDataHandlers = dataHandler.getChildren();
       if (!childrenDataHandlers.isEmpty()) {
-        for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+        for (final IRVertexDataHandler childDataHandler : childrenDataHandlers) {
           runTask(childDataHandler, element);
         }
       }
 
       // Write element-wise to OutputWriters if any
-      if (hasOutputWriter(task)) {
+      if (hasOutputWriter(irVertex)) {
         List<OutputWriter> outputWritersOfTask = dataHandler.getOutputWriters();
         outputWritersOfTask.forEach(outputWriter -> outputWriter.write(element));
       }
@@ -689,16 +675,6 @@ public final class TaskExecutor {
   }
 
   /**
-   * Get the matching physical task id of the given logical task id.
-   *
-   * @param logicalTaskId the logical task id.
-   * @return the physical task id.
-   */
-  private String getPhysicalTaskId(final String logicalTaskId) {
-    return RuntimeIdGenerator.generatePhysicalTaskId(taskIdx, logicalTaskId);
-  }
-
-  /**
    * Generate a unique iterator id.
    *
    * @return the iterator id.
@@ -707,9 +683,9 @@ public final class TaskExecutor {
     return ITERATORID_PREFIX + ITERATORID_GENERATOR.getAndIncrement();
   }
 
-  private TaskDataHandler getTaskDataHandler(final Task task) {
-    return taskDataHandlers.stream()
-        .filter(dataHandler -> dataHandler.getTask() == task)
+  private IRVertexDataHandler getIRVertexDataHandler(final IRVertex irVertex) {
+    return irVertexDataHandlers.stream()
+        .filter(dataHandler -> dataHandler.getIRVertex() == irVertex)
         .findFirst().get();
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 77d5136..e2a31d6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -15,16 +15,13 @@
  */
 package edu.snu.nemo.runtime.executor;
 
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 
 import java.util.*;
 
@@ -47,13 +44,12 @@ public final class TaskStateManager {
   private final MetricCollector metricCollector;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
-  public TaskStateManager(final ScheduledTask scheduledTask,
-                               final DAG<Task, RuntimeEdge<Task>> taskDag,
-                               final String executorId,
-                               final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
-                               final MetricMessageSender metricMessageSender) {
-    this.taskId = scheduledTask.getTaskId();
-    this.attemptIdx = scheduledTask.getAttemptIdx();
+  public TaskStateManager(final ExecutableTask executableTask,
+                          final String executorId,
+                          final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
+                          final MetricMessageSender metricMessageSender) {
+    this.taskId = executableTask.getTaskId();
+    this.attemptIdx = executableTask.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.metricCollector = new MetricCollector(metricMessageSender);
@@ -62,11 +58,11 @@ public final class TaskStateManager {
   /**
    * Updates the state of the task.
    * @param newState of the task.
-   * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
+   * @param vertexPutOnHold the vertex put on hold.
    * @param cause only provided as non-empty upon recoverable failures.
    */
   public synchronized void onTaskStateChanged(final TaskState.State newState,
-                                              final Optional<String> taskPutOnHold,
+                                              final Optional<String> vertexPutOnHold,
                                               final Optional<TaskState.RecoverableFailureCause> cause) {
     final Map<String, Object> metric = new HashMap<>();
 
@@ -98,7 +94,7 @@ public final class TaskStateManager {
         break;
       case ON_HOLD:
         LOG.debug("Task ID {} put on hold.", this.taskId);
-        notifyTaskStateToMaster(newState, taskPutOnHold, cause);
+        notifyTaskStateToMaster(newState, vertexPutOnHold, cause);
         break;
       default:
         throw new IllegalStateException("Illegal state at this point");
@@ -108,20 +104,20 @@ public final class TaskStateManager {
   /**
    * Notifies the change in task state to master.
    * @param newState of the task.
-   * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
+   * @param vertexPutOnHold the vertex put on hold.
    * @param cause only provided as non-empty upon recoverable failures.
    */
   private void notifyTaskStateToMaster(final TaskState.State newState,
-                                            final Optional<String> taskPutOnHold,
-                                            final Optional<TaskState.RecoverableFailureCause> cause) {
+                                       final Optional<String> vertexPutOnHold,
+                                       final Optional<TaskState.RecoverableFailureCause> cause) {
     final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
         ControlMessage.TaskStateChangedMsg.newBuilder()
-          .setExecutorId(executorId)
-          .setTaskId(taskId)
-          .setAttemptIdx(attemptIdx)
-          .setState(convertState(newState));
-    if (taskPutOnHold.isPresent()) {
-          msgBuilder.setTaskPutOnHoldId(taskPutOnHold.get());
+            .setExecutorId(executorId)
+            .setTaskId(taskId)
+            .setAttemptIdx(attemptIdx)
+            .setState(convertState(newState));
+    if (vertexPutOnHold.isPresent()) {
+      msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
     }
     if (cause.isPresent()) {
       msgBuilder.setFailureCause(convertFailureCause(cause.get()));
@@ -139,33 +135,33 @@ public final class TaskStateManager {
 
   private ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
     switch (state) {
-    case READY:
-      return ControlMessage.TaskStateFromExecutor.READY;
-    case EXECUTING:
-      return ControlMessage.TaskStateFromExecutor.EXECUTING;
-    case COMPLETE:
-      return ControlMessage.TaskStateFromExecutor.COMPLETE;
-    case FAILED_RECOVERABLE:
-      return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
-    case FAILED_UNRECOVERABLE:
-      return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
-    case ON_HOLD:
-      return ControlMessage.TaskStateFromExecutor.ON_HOLD;
-    default:
-      throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+      case READY:
+        return ControlMessage.TaskStateFromExecutor.READY;
+      case EXECUTING:
+        return ControlMessage.TaskStateFromExecutor.EXECUTING;
+      case COMPLETE:
+        return ControlMessage.TaskStateFromExecutor.COMPLETE;
+      case FAILED_RECOVERABLE:
+        return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
+      case FAILED_UNRECOVERABLE:
+        return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
+      case ON_HOLD:
+        return ControlMessage.TaskStateFromExecutor.ON_HOLD;
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
     }
   }
 
   private ControlMessage.RecoverableFailureCause convertFailureCause(
-    final TaskState.RecoverableFailureCause cause) {
+      final TaskState.RecoverableFailureCause cause) {
     switch (cause) {
-    case INPUT_READ_FAILURE:
-      return ControlMessage.RecoverableFailureCause.InputReadFailure;
-    case OUTPUT_WRITE_FAILURE:
-      return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
-    default:
-      throw new UnknownFailureCauseException(
-          new Throwable("The failure cause for the recoverable failure is unknown"));
+      case INPUT_READ_FAILURE:
+        return ControlMessage.RecoverableFailureCause.InputReadFailure;
+      case OUTPUT_WRITE_FAILURE:
+        return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
+      default:
+        throw new UnknownFailureCauseException(
+            new Throwable("The failure cause for the recoverable failure is unknown"));
     }
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index d7b13f2..9385545 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -159,6 +159,7 @@ public final class BlockManagerWorker {
             numSerializedBytes += partition.getNumSerializedBytes();
             numEncodedBytes += partition.getNumEncodedBytes();
           }
+
           return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
               numEncodedBytes));
         } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
@@ -207,13 +208,16 @@ public final class BlockManagerWorker {
                       .build());
           return responseFromMasterFuture;
         });
-    blockLocationFuture.whenComplete((message, throwable) -> pendingBlockLocationRequest.remove(blockId));
+    blockLocationFuture.whenComplete((message, throwable) -> {
+      pendingBlockLocationRequest.remove(blockId);
+    });
 
     // Using thenCompose so that fetching block data starts after getting response from master.
     return blockLocationFuture.thenCompose(responseFromMaster -> {
       if (responseFromMaster.getType() != ControlMessage.MessageType.BlockLocationInfo) {
         throw new RuntimeException("Response message type mismatch!");
       }
+
       final ControlMessage.BlockLocationInfoMsg blockLocationInfoMsg =
           responseFromMaster.getBlockLocationInfoMsg();
       if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 13531c2..7a4068e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -18,7 +18,6 @@ package edu.snu.nemo.runtime.executor.datatransfer;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.reef.tang.annotations.Parameter;
 
@@ -42,18 +41,18 @@ public final class DataTransferFactory {
   /**
    * Creates an {@link OutputWriter} between two stages.
    *
-   * @param srcTask     the {@link Task} that outputs the data to be written.
+   * @param srcIRVertex     the {@link IRVertex} that outputs the data to be written.
    * @param srcTaskIdx  the index of the source task.
    * @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
    * @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
    * @return the {@link OutputWriter} created.
    */
-  public OutputWriter createWriter(final Task srcTask,
+  public OutputWriter createWriter(final IRVertex srcIRVertex,
                                    final int srcTaskIdx,
                                    final IRVertex dstIRVertex,
                                    final RuntimeEdge<?> runtimeEdge) {
     return new OutputWriter(hashRangeMultiplier, srcTaskIdx,
-        srcTask.getIrVertexId(), dstIRVertex, runtimeEdge, blockManagerWorker);
+        srcIRVertex.getId(), dstIRVertex, runtimeEdge, blockManagerWorker);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
similarity index 72%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
index 56f5e5b..84b7a8e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,9 +26,9 @@ import java.util.List;
  * As Task input is processed element-wise, Task output element percolates down
  * through the DAG of children TaskDataHandlers.
  */
-public final class TaskDataHandler {
-  private final Task task;
-  private List<TaskDataHandler> children;
+public final class IRVertexDataHandler {
+  private final IRVertex irVertex;
+  private List<IRVertexDataHandler> children;
   private final List<OutputCollectorImpl> inputFromThisStage;
   private final List<InputReader> sideInputFromOtherStages;
   private final List<OutputCollectorImpl> sideInputFromThisStage;
@@ -36,12 +36,12 @@ public final class TaskDataHandler {
   private final List<OutputWriter> outputWriters;
 
   /**
-   * TaskDataHandler Constructor.
+   * IRVertexDataHandler Constructor.
    *
-   * @param task Task of this TaskDataHandler.
+   * @param irVertex Task of this IRVertexDataHandler.
    */
-  public TaskDataHandler(final Task task) {
-    this.task = task;
+  public IRVertexDataHandler(final IRVertex irVertex) {
+    this.irVertex = irVertex;
     this.children = new ArrayList<>();
     this.inputFromThisStage = new ArrayList<>();
     this.sideInputFromOtherStages = new ArrayList<>();
@@ -51,12 +51,12 @@ public final class TaskDataHandler {
   }
 
   /**
-   * Get the task that owns this TaskDataHandler.
+   * Get the irVertex that owns this IRVertexDataHandler.
    *
-   * @return task of this TaskDataHandler.
+   * @return irVertex of this IRVertexDataHandler.
    */
-  public Task getTask() {
-    return task;
+  public IRVertex getIRVertex() {
+    return irVertex;
   }
 
   /**
@@ -64,7 +64,7 @@ public final class TaskDataHandler {
    *
    * @return DAG of children tasks' TaskDataHandlers.
    */
-  public List<TaskDataHandler> getChildren() {
+  public List<IRVertexDataHandler> getChildren() {
     return children;
   }
 
@@ -89,18 +89,18 @@ public final class TaskDataHandler {
   }
 
   /**
-   * Get OutputCollector of this task.
+   * Get OutputCollector of this irVertex.
    *
-   * @return OutputCollector of this task.
+   * @return OutputCollector of this irVertex.
    */
   public OutputCollectorImpl getOutputCollector() {
     return outputCollector;
   }
 
   /**
-   * Get OutputWriters of this task.
+   * Get OutputWriters of this irVertex.
    *
-   * @return OutputWriters of this task.
+   * @return OutputWriters of this irVertex.
    */
   public List<OutputWriter> getOutputWriters() {
     return outputWriters;
@@ -111,14 +111,14 @@ public final class TaskDataHandler {
    *
    * @param childrenDataHandler list of children TaskDataHandlers.
    */
-  public void setChildrenDataHandler(final List<TaskDataHandler> childrenDataHandler) {
+  public void setChildrenDataHandler(final List<IRVertexDataHandler> childrenDataHandler) {
     children = childrenDataHandler;
   }
 
   /**
-   * Add OutputCollector of a parent task that will provide intra-stage input.
+   * Add OutputCollector of a parent irVertex that will provide intra-stage input.
    *
-   * @param input OutputCollector of a parent task.
+   * @param input OutputCollector of a parent irVertex.
    */
   public void addInputFromThisStages(final OutputCollectorImpl input) {
     inputFromThisStage.add(input);
@@ -134,27 +134,27 @@ public final class TaskDataHandler {
   }
 
   /**
-   * Add OutputCollector of a parent task that will provide intra-stage side input.
+   * Add OutputCollector of a parent irVertex that will provide intra-stage side input.
    *
-   * @param ocAsSideInput OutputCollector of a parent task with side input.
+   * @param ocAsSideInput OutputCollector of a parent irVertex with side input.
    */
   public void addSideInputFromThisStage(final OutputCollectorImpl ocAsSideInput) {
     sideInputFromThisStage.add(ocAsSideInput);
   }
 
   /**
-   * Set OutputCollector of this task.
+   * Set OutputCollector of this irVertex.
    *
-   * @param oc OutputCollector of this task.
+   * @param oc OutputCollector of this irVertex.
    */
   public void setOutputCollector(final OutputCollectorImpl oc) {
     outputCollector = oc;
   }
 
   /**
-   * Add OutputWriter of this task.
+   * Add OutputWriter of this irVertex.
    *
-   * @param outputWriter OutputWriter of this task.
+   * @param outputWriter OutputWriter of this irVertex.
    */
   public void addOutputWriter(final OutputWriter outputWriter) {
     outputWriters.add(outputWriter);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 80ab1ea..49401aa 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -162,9 +163,9 @@ public final class JobStateManager {
 
       // Initialize states for blocks of stage internal edges
       taskIdsForStage.forEach(taskId -> {
-        final DAG<Task, RuntimeEdge<Task>> taskInternalDag = physicalStage.getTaskDag();
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = physicalStage.getIRDAG();
         taskInternalDag.getVertices().forEach(task -> {
-          final List<RuntimeEdge<Task>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
+          final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
           internalOutgoingEdges.forEach(taskRuntimeEdge -> {
             final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
             final String blockId = RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 8d22424..51c505d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -119,7 +119,7 @@ public final class RuntimeMaster {
   public Pair<JobStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
                                                                  final int maxScheduleAttempt) {
     final Callable<Pair<JobStateManager, ScheduledExecutorService>> jobExecutionCallable = () -> {
-      this.irVertices.addAll(plan.getTaskIRVertexMap().values());
+      this.irVertices.addAll(plan.getIdToIRVertex().values());
       try {
         final JobStateManager jobStateManager =
             new JobStateManager(plan, blockManagerMaster, metricMessageHandler, maxScheduleAttempt);
@@ -268,7 +268,7 @@ public final class RuntimeMaster {
             taskStateChangedMsg.getTaskId(),
             taskStateChangedMsg.getAttemptIdx(),
             convertTaskState(taskStateChangedMsg.getState()),
-            taskStateChangedMsg.getTaskPutOnHoldId(),
+            taskStateChangedMsg.getVertexPutOnHoldId(),
             convertFailureCause(taskStateChangedMsg.getFailureCause()));
         break;
       case ExecutorFailed:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index caf1606..8536746 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
@@ -95,17 +95,17 @@ public final class ExecutorRepresenter {
 
   /**
    * Marks the Task as running, and sends scheduling message to the executor.
-   * @param scheduledTask
+   * @param executableTask
    */
-  public void onTaskScheduled(final ScheduledTask scheduledTask) {
-    runningTasks.add(scheduledTask.getTaskId());
-    runningTaskToAttempt.put(scheduledTask.getTaskId(), scheduledTask.getAttemptIdx());
-    failedTasks.remove(scheduledTask.getTaskId());
+  public void onTaskScheduled(final ExecutableTask executableTask) {
+    runningTasks.add(executableTask.getTaskId());
+    runningTaskToAttempt.put(executableTask.getTaskId(), executableTask.getAttemptIdx());
+    failedTasks.remove(executableTask.getTaskId());
 
     serializationExecutorService.submit(new Runnable() {
       @Override
       public void run() {
-        final byte[] serialized = SerializationUtils.serialize(scheduledTask);
+        final byte[] serialized = SerializationUtils.serialize(executableTask);
         sendControlMessage(
             ControlMessage.Message.newBuilder()
                 .setId(RuntimeIdGenerator.generateMessageId())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index a74ff10..f32a441 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -136,14 +137,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @param taskId whose state has changed
    * @param taskAttemptIndex of the task whose state has changed
    * @param newState the state to change to
-   * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
+   * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
    */
   @Override
   public void onTaskStateChanged(final String executorId,
                                  final String taskId,
                                  final int taskAttemptIndex,
                                  final TaskState.State newState,
-                                 @Nullable final String taskPutOnHold,
+                                 @Nullable final String vertexPutOnHold,
                                  final TaskState.RecoverableFailureCause failureCause) {
     final int currentTaskAttemptIndex = jobStateManager.getCurrentAttemptIndexForTask(taskId);
     if (taskAttemptIndex == currentTaskAttemptIndex) {
@@ -158,7 +159,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
           break;
         case ON_HOLD:
           jobStateManager.onTaskStateChanged(taskId, newState);
-          onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+          onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
           break;
         case FAILED_UNRECOVERABLE:
           throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on Task #")
@@ -392,7 +393,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     LOG.info("Scheduling Stage {}", stageToSchedule.getId());
 
     // each readable and source task will be bounded in executor.
-    final List<Map<String, Readable>> logicalTaskIdToReadables = stageToSchedule.getLogicalTaskIdToReadables();
+    final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
 
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId);
@@ -400,23 +401,27 @@ public final class BatchSingleJobScheduler implements Scheduler {
       final int attemptIdx = jobStateManager.getCurrentAttemptIndexForTask(taskId);
 
       LOG.debug("Enqueueing {}", taskId);
-      pendingTaskCollection.add(new ScheduledTask(physicalPlan.getId(),
-          stageToSchedule.getSerializedTaskDag(), taskId, stageIncomingEdges, stageOutgoingEdges, attemptIdx,
-          stageToSchedule.getContainerType(), logicalTaskIdToReadables.get(taskIdx)));
+      pendingTaskCollection.add(new ExecutableTask(
+          physicalPlan.getId(),
+          taskId,
+          attemptIdx,
+          stageToSchedule.getContainerType(),
+          stageToSchedule.getSerializedIRDAG(),
+          stageIncomingEdges,
+          stageOutgoingEdges,
+          vertexIdToReadables.get(taskIdx)));
     });
     schedulerRunner.onATaskAvailable();
   }
 
   /**
-   * Gets the DAG of a task from it's ID.
-   *
-   * @param taskId the ID of the task to get.
-   * @return the DAG of the task.
+   * @param taskId id of the task
+   * @return the IR dag
    */
-  private DAG<Task, RuntimeEdge<Task>> getTaskDagById(final String taskId) {
+  private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
     for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
       if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return physicalStage.getTaskDag();
+        return physicalStage.getIRDAG();
       }
     }
     throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
@@ -471,13 +476,13 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   /**
    * Action for after task execution is put on hold.
-   * @param executorId     the ID of the executor.
-   * @param taskId    the ID of the task.
-   * @param taskPutOnHold  the ID of task that is put on hold.
+   * @param executorId       the ID of the executor.
+   * @param taskId           the ID of the task.
+   * @param vertexPutOnHold  the ID of vertex that is put on hold.
    */
   private void onTaskExecutionOnHold(final String executorId,
                                      final String taskId,
-                                     final String taskPutOnHold) {
+                                     final String vertexPutOnHold) {
     LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
       executor.onTaskExecutionComplete(taskId);
@@ -491,15 +496,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
     if (stageComplete) {
       // get optimization vertex from the task.
       final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-          getTaskDagById(taskId).getVertices().stream() // get tasks list
-              .filter(task -> task.getId().equals(taskPutOnHold)) // find it
-              .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex
+          getVertexDagById(taskId).getVertices().stream() // get vertex list
+              .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it
               .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
               .distinct()
               .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
               .findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it
               + " called with failed task ids by some other task than "
-              + MetricCollectionBarrierTask.class.getSimpleName()));
+              + MetricCollectionBarrierVertex.class.getSimpleName()));
       // and we will use this vertex to perform metric collection and dynamic optimization.
 
       pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index 1c38e35..ab5081c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -46,10 +46,10 @@ public final class CompositeSchedulingPolicy implements SchedulingPolicy {
 
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     Set<ExecutorRepresenter> candidates = executorRepresenterSet;
     for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, scheduledTask);
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, executableTask);
     }
     return candidates;
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index d64fffb..f23a1f0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -37,20 +37,20 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
    *                               If the container type of target Task is NONE, it will return the original set.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
 
-    if (scheduledTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    if (executableTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(scheduledTask.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(executableTask.getContainerType()))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 2f598d2..e72f486 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -35,12 +35,12 @@ public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
    *                               Executors that do not have any free slots will be filtered by this policy.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
             .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index d6696e9..361307f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -38,9 +38,9 @@ public interface PendingTaskCollection {
 
   /**
    * Adds a Task to this collection.
-   * @param scheduledTask to add.
+   * @param executableTask to add.
    */
-  void add(final ScheduledTask scheduledTask);
+  void add(final ExecutableTask executableTask);
 
   /**
    * Removes the specified Task to be scheduled.
@@ -49,14 +49,14 @@ public interface PendingTaskCollection {
    * @throws NoSuchElementException if the specified Task is not in the queue,
    *                                or removing this Task breaks scheduling order
    */
-  ScheduledTask remove(final String taskId) throws NoSuchElementException;
+  ExecutableTask remove(final String taskId) throws NoSuchElementException;
 
   /**
    * Peeks stage that can be scheduled according to job dependency priority.
    * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
    * @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
    */
-  Optional<Collection<ScheduledTask>> peekSchedulableStage();
+  Optional<Collection<ExecutableTask>> peekSchedulableStage();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index 61077fa..a232a79 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -47,12 +47,12 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
 
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     final OptionalInt minOccupancy =
         executorRepresenterSet.stream()
         .map(executor -> executor.getRunningTasks().size())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index f9bb73a..d34eb56 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -30,9 +30,6 @@ import javax.annotation.Nullable;
  * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link PendingTaskCollection},
  * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
- *
- * Receives jobs to execute and schedules
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask} to executors.
  */
 @DriverSide
 @DefaultImplementation(BatchSingleJobScheduler.class)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index fa98e39..f411c1d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +112,7 @@ public final class SchedulerRunner {
   }
 
   void doScheduleStage() {
-    final Collection<ScheduledTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
+    final Collection<ExecutableTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
     if (stageToSchedule == null) {
       // Task queue is empty
       LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
@@ -120,7 +120,7 @@ public final class SchedulerRunner {
     }
 
     final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
-    for (final ScheduledTask schedulableTask : stageToSchedule) {
+    for (final ExecutableTask schedulableTask : stageToSchedule) {
       final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
       LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index ab9e251..1e71882 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@ import java.util.Set;
 @DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
   Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final ScheduledTask scheduledTask);
+                                                      final ExecutableTask executableTask);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index a5f5736..0375e01 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -41,7 +41,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
   /**
    * Pending Tasks awaiting to be scheduled for each stage.
    */
-  private final ConcurrentMap<String, Map<String, ScheduledTask>> stageIdToPendingTasks;
+  private final ConcurrentMap<String, Map<String, ExecutableTask>> stageIdToPendingTasks;
 
   /**
    * Stages with Tasks that have not yet been scheduled.
@@ -55,17 +55,17 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
   }
 
   @Override
-  public synchronized void add(final ScheduledTask scheduledTask) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+  public synchronized void add(final ExecutableTask executableTask) {
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
 
     stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
       if (taskIdToTask == null) {
-        final Map<String, ScheduledTask> taskIdToTaskMap = new HashMap<>();
-        taskIdToTaskMap.put(scheduledTask.getTaskId(), scheduledTask);
-        updateSchedulableStages(stageId, scheduledTask.getContainerType());
+        final Map<String, ExecutableTask> taskIdToTaskMap = new HashMap<>();
+        taskIdToTaskMap.put(executableTask.getTaskId(), executableTask);
+        updateSchedulableStages(stageId, executableTask.getContainerType());
         return taskIdToTaskMap;
       } else {
-        taskIdToTask.put(scheduledTask.getTaskId(), scheduledTask);
+        taskIdToTask.put(executableTask.getTaskId(), executableTask);
         return taskIdToTask;
       }
     });
@@ -81,18 +81,18 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    *                                (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
    */
   @Override
-  public synchronized ScheduledTask remove(final String taskId) throws NoSuchElementException {
+  public synchronized ExecutableTask remove(final String taskId) throws NoSuchElementException {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       throw new NoSuchElementException("No schedulable stage in Task queue");
     }
 
-    final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
 
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
     }
-    final ScheduledTask taskToSchedule = pendingTasksForStage.remove(taskId);
+    final ExecutableTask taskToSchedule = pendingTasksForStage.remove(taskId);
     if (taskToSchedule == null) {
       throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
     }
@@ -115,13 +115,13 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    *         or {@link Optional#empty} if the queue is empty
    */
   @Override
-  public synchronized Optional<Collection<ScheduledTask>> peekSchedulableStage() {
+  public synchronized Optional<Collection<ExecutableTask>> peekSchedulableStage() {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       return Optional.empty();
     }
 
-    final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index 5432f0b..f7056bf 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -59,15 +59,15 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
    *                               If there is no source locations, will return original set.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     final Set<String> sourceLocations;
     try {
-      sourceLocations = getSourceLocations(scheduledTask.getLogicalTaskIdToReadable().values());
+      sourceLocations = getSourceLocations(executableTask.getIrVertexIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
       return executorRepresenterSet;
     } catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 07d36a2..3150475 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,10 +16,8 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.ContainerTypeAwareSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -34,7 +32,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link ContainerTypeAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
 public final class ContainerTypeAwareSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -50,24 +48,24 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
-    final ScheduledTask scheduledTask1 = mock(ScheduledTask.class);
-    when(scheduledTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    final ExecutableTask executableTask1 = mock(ExecutableTask.class);
+    when(executableTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, scheduledTask1);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, executableTask1);
 
     final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors1, candidateExecutors1);
 
-    final ScheduledTask scheduledTask2 = mock(ScheduledTask.class);
-    when(scheduledTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    final ExecutableTask executableTask2 = mock(ExecutableTask.class);
+    when(executableTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, scheduledTask2);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, executableTask2);
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
index cb100f0..0071b9f 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,10 +15,8 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.FreeSlotSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -34,7 +32,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link FreeSlotSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
 public final class FreeSlotSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
@@ -53,12 +51,12 @@ public final class FreeSlotSchedulingPolicyTest {
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
-    final ScheduledTask scheduledTask = mock(ScheduledTask.class);
+    final ExecutableTask executableTask = mock(ExecutableTask.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
index 10cced0..5edd885 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
 public final class RoundRobinSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
@@ -50,12 +50,12 @@ public final class RoundRobinSchedulingPolicyTest {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    final ScheduledTask scheduledTask = mock(ScheduledTask.class);
+    final ExecutableTask executableTask = mock(ExecutableTask.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
index 5aecc3d..0ae5998 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
@@ -83,7 +83,7 @@ public final class SingleTaskQueueTest {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTask dequeuedTask = dequeue();
+        final ExecutableTask dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(1).getId());
 
@@ -135,7 +135,7 @@ public final class SingleTaskQueueTest {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ScheduledTask dequeuedTask = dequeue();
+        final ExecutableTask dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(0).getId());
 
@@ -211,9 +211,15 @@ public final class SingleTaskQueueTest {
    */
   private void scheduleStage(final PhysicalStage stage) {
     stage.getTaskIds().forEach(taskId ->
-        pendingTaskPriorityQueue.add(new ScheduledTask(
-            "TestPlan", stage.getSerializedTaskDag(), taskId, Collections.emptyList(),
-            Collections.emptyList(), 0, stage.getContainerType(), Collections.emptyMap())));
+        pendingTaskPriorityQueue.add(new ExecutableTask(
+            "TestPlan",
+            taskId,
+            0,
+            stage.getContainerType(),
+            stage.getSerializedIRDAG(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyMap())));
   }
 
   /**
@@ -221,17 +227,17 @@ public final class SingleTaskQueueTest {
    * @return the stage name of the dequeued task.
    */
   private String dequeueAndGetStageId() {
-    final ScheduledTask scheduledTask = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+    final ExecutableTask executableTask = dequeue();
+    return RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
   }
 
   /**
    * Dequeues a scheduled task from the task priority queue.
    * @return the Task dequeued
    */
-  private ScheduledTask dequeue() {
-    final Collection<ScheduledTask> scheduledTasks
+  private ExecutableTask dequeue() {
+    final Collection<ExecutableTask> executableTasks
         = pendingTaskPriorityQueue.peekSchedulableStage().get();
-    return pendingTaskPriorityQueue.remove(scheduledTasks.iterator().next().getTaskId());
+    return pendingTaskPriorityQueue.remove(executableTasks.iterator().next().getTaskId());
   }
 }
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 87aa123..65f9d98 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
  * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class, Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ScheduledTask} when
+   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ExecutableTask} when
    * there are no executors in appropriate location(s).
    */
   @Test
@@ -54,7 +54,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
 
     // Prepare test scenario
-    final ScheduledTask task = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task = CreateExecutableTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)));
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
@@ -70,19 +70,19 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   public void testSourceLocationAwareSchedulingWithMultiSource() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
-    final ScheduledTask task0 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task0 = CreateExecutableTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
-    final ScheduledTask task1 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task1 = CreateExecutableTask.withReadablesWithSourceLocations(
         Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
-    final ScheduledTask task2 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task2 = CreateExecutableTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
             Arrays.asList(SITE_1, SITE_2)));
-    final ScheduledTask task3 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task3 = CreateExecutableTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
             Arrays.asList(SITE_0, SITE_2)));
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
-    for (final ScheduledTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
+    for (final ExecutableTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
       assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
           new HashSet<>(Collections.singletonList(e)), task));
     }
@@ -90,23 +90,23 @@ public final class SourceLocationAwareSchedulingPolicyTest {
 
 
   /**
-   * Utility for creating {@link ScheduledTask}.
+   * Utility for creating {@link ExecutableTask}.
    */
-  private static final class CreateScheduledTask {
+  private static final class CreateExecutableTask {
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
     private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
 
-    private static ScheduledTask doCreate(final Collection<Readable> readables) {
-      final ScheduledTask mockInstance = mock(ScheduledTask.class);
+    private static ExecutableTask doCreate(final Collection<Readable> readables) {
+      final ExecutableTask mockInstance = mock(ExecutableTask.class);
       final Map<String, Readable> readableMap = new HashMap<>();
       readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
           readable));
       when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement()));
-      when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
+      when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap);
       return mockInstance;
     }
 
-    static ScheduledTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+    static ExecutableTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ScheduledTask withReadablesWithoutSourceLocations(final int numReadables) {
+    static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ScheduledTask withReadablesWhichThrowException(final int numReadables) {
+    static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ScheduledTask withoutReadables() {
+    static ExecutableTask withoutReadables() {
       return doCreate(Collections.emptyList());
     }
   }
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index d856dd0..631f837 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -101,7 +101,7 @@ public final class TestPlanGenerator {
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
-    return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getTaskIRVertexMap());
+    return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
   }
 
   /**
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index 51b753e..f257926 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -82,7 +82,7 @@ public class ClientEndpointTest {
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
     final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index affcbed..a6df1d4 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -20,10 +20,12 @@ import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -63,7 +65,7 @@ public final class TaskExecutorTest {
   private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
   private static final int SOURCE_PARALLELISM = 5;
   private List elements;
-  private Map<String, List<Object>> taskIdToOutputData;
+  private Map<String, List<Object>> vertexIdToOutputData;
   private DataTransferFactory dataTransferFactory;
   private TaskStateManager taskStateManager;
   private MetricMessageSender metricMessageSender;
@@ -76,7 +78,7 @@ public final class TaskExecutorTest {
     taskStateManager = mock(TaskStateManager.class);
 
     // Mock a DataTransferFactory.
-    taskIdToOutputData = new HashMap<>();
+    vertexIdToOutputData = new HashMap<>();
     dataTransferFactory = mock(DataTransferFactory.class);
     when(dataTransferFactory.createReader(anyInt(), any(), any())).then(new InterStageReaderAnswer());
     when(dataTransferFactory.createWriter(any(), anyInt(), any(), any())).then(new WriterAnswer());
@@ -88,17 +90,13 @@ public final class TaskExecutorTest {
   }
 
   /**
-   * Test the {@link BoundedSourceTask} processing in {@link TaskExecutor}.
+   * Test the {@link edu.snu.nemo.common.ir.vertex.SourceVertex} processing in {@link TaskExecutor}.
    */
-  @Test(timeout=2000)
-  public void testSourceTask() throws Exception {
-    final IRVertex sourceIRVertex = new SimpleIRVertex();
-    final String sourceIrVertexId = sourceIRVertex.getId();
-
-    final String sourceTaskId = RuntimeIdGenerator.generateLogicalTaskId("Source_IR_Vertex");
+  @Test(timeout=5000)
+  public void testSourceVertex() throws Exception {
+    final IRVertex sourceIRVertex = new EmptyComponents.EmptySourceVertex("empty");
     final String stageId = RuntimeIdGenerator.generateStageId(0);
 
-    final BoundedSourceTask<Integer> boundedSourceTask = new BoundedSourceTask<>(sourceTaskId, sourceIrVertexId);
     final Readable readable = new Readable() {
       @Override
       public Iterable read() throws Exception {
@@ -109,30 +107,37 @@ public final class TaskExecutorTest {
         throw new UnsupportedOperationException();
       }
     };
-    final Map<String, Readable> logicalIdToReadable = new HashMap<>();
-    logicalIdToReadable.put(sourceTaskId, readable);
+    final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+    vertexIdToReadable.put(sourceIRVertex.getId(), readable);
 
-    final DAG<Task, RuntimeEdge<Task>> taskDag =
-        new DAGBuilder<Task, RuntimeEdge<Task>>().addVertex(boundedSourceTask).build();
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+        new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().addVertex(sourceIRVertex).buildWithoutSourceSinkCheck();
     final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
-    final ScheduledTask scheduledTask =
-        new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.emptyList(),
-            Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, logicalIdToReadable);
+    final ExecutableTask executableTask =
+        new ExecutableTask(
+            "testSourceVertex",
+            taskId,
+            0,
+            CONTAINER_TYPE,
+            new byte[0],
+            Collections.emptyList(),
+            Collections.singletonList(stageOutEdge),
+            vertexIdToReadable);
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.
-    assertEquals(100, taskIdToOutputData.get(sourceTaskId).size());
-    assertEquals(elements.get(0), taskIdToOutputData.get(sourceTaskId).get(0));
+    assertEquals(100, vertexIdToOutputData.get(sourceIRVertex.getId()).size());
+    assertEquals(elements.get(0), vertexIdToOutputData.get(sourceIRVertex.getId()).get(0));
   }
 
   /**
-   * Test the {@link OperatorTask} processing in {@link TaskExecutor}.
+   * Test the {@link edu.snu.nemo.common.ir.vertex.OperatorVertex} processing in {@link TaskExecutor}.
    *
    * The DAG of the task to test will looks like:
    * operator task 1 -> operator task 2
@@ -142,47 +147,46 @@ public final class TaskExecutorTest {
    * Because of this, the operator task 1 will process multiple partitions and emit data in multiple times also.
    * On the other hand, operator task 2 will receive the output data once and produce a single output.
    */
-  @Test//(timeout=2000)
-  public void testOperatorTask() throws Exception {
-    final IRVertex operatorIRVertex1 = new SimpleIRVertex();
-    final IRVertex operatorIRVertex2 = new SimpleIRVertex();
-    final String operatorIRVertexId1 = operatorIRVertex1.getId();
-    final String operatorIRVertexId2 = operatorIRVertex2.getId();
+  @Test(timeout=5000)
+  public void testOperatorVertex() throws Exception {
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new SimpleTransform());
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new SimpleTransform());
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
 
-    final String operatorTaskId1 = RuntimeIdGenerator.generateLogicalTaskId("Operator_vertex_1");
-    final String operatorTaskId2 = RuntimeIdGenerator.generateLogicalTaskId("Operator_vertex_2");
     final String stageId = RuntimeIdGenerator.generateStageId(1);
 
-    final OperatorTask operatorTask1 =
-        new OperatorTask(operatorTaskId1, operatorIRVertexId1, new SimpleTransform());
-    final OperatorTask operatorTask2 =
-        new OperatorTask(operatorTaskId2, operatorIRVertexId2, new SimpleTransform());
     final Coder coder = Coder.DUMMY_CODER;
     ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
     edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    final DAG<Task, RuntimeEdge<Task>> taskDag = new DAGBuilder<Task, RuntimeEdge<Task>>()
-        .addVertex(operatorTask1)
-        .addVertex(operatorTask2)
-        .connectVertices(new RuntimeEdge<Task>(
-            runtimeIREdgeId, edgeProperties, operatorTask1, operatorTask2, coder))
-        .build();
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+        .addVertex(operatorIRVertex1)
+        .addVertex(operatorIRVertex2)
+        .connectVertices(new RuntimeEdge<IRVertex>(
+            runtimeIREdgeId, edgeProperties, operatorIRVertex1, operatorIRVertex2, coder))
+        .buildWithoutSourceSinkCheck();
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
     final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
     final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
-    final ScheduledTask scheduledTask =
-        new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.singletonList(stageInEdge),
-            Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, Collections.emptyMap());
+    final ExecutableTask executableTask =
+        new ExecutableTask(
+            "testSourceVertex",
+            taskId,
+            0,
+            CONTAINER_TYPE,
+            new byte[0],
+            Collections.singletonList(stageInEdge),
+            Collections.singletonList(stageOutEdge),
+            Collections.emptyMap());
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.
-    assertEquals(100, taskIdToOutputData.get(operatorTaskId2).size());
+    assertEquals(100, vertexIdToOutputData.get(operatorIRVertex2.getId()).size());
   }
 
   /**
@@ -234,15 +238,15 @@ public final class TaskExecutorTest {
     @Override
     public OutputWriter answer(final InvocationOnMock invocationOnMock) throws Throwable {
       final Object[] args = invocationOnMock.getArguments();
-      final Task dstTask = (Task) args[0];
+      final IRVertex vertex = (IRVertex) args[0];
       final OutputWriter outputWriter = mock(OutputWriter.class);
       doAnswer(new Answer() {
         @Override
         public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
           final Object[] args = invocationOnMock.getArguments();
           final Object dataToWrite = args[0];
-          taskIdToOutputData.computeIfAbsent(dstTask.getId(), emptyTaskId -> new ArrayList<>());
-          taskIdToOutputData.get(dstTask.getId()).add(dataToWrite);
+          vertexIdToOutputData.computeIfAbsent(vertex.getId(), emptyTaskId -> new ArrayList<>());
+          vertexIdToOutputData.get(vertex.getId()).add(dataToWrite);
           return null;
         }
       }).when(outputWriter).write(any());
@@ -251,16 +255,6 @@ public final class TaskExecutorTest {
   }
 
   /**
-   * Simple {@link IRVertex} for testing.
-   */
-  private class SimpleIRVertex extends IRVertex {
-    @Override
-    public IRVertex getClone() {
-      return null; // Not used.
-    }
-  }
-
-  /**
    * Simple {@link Transform} for testing.
    * @param <T> input/output type.
    */
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index cb9ee78..cfdeb2f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -29,7 +29,6 @@ import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.stores.*;
@@ -81,8 +80,8 @@ public final class BlockStoreTest {
   private BlockManagerMaster blockManagerMaster;
   private LocalMessageDispatcher messageDispatcher;
   // Variables for shuffle test
-  private static final int NUM_WRITE_TASKS = 3;
-  private static final int NUM_READ_TASKS = 3;
+  private static final int NUM_WRITE_VERTICES = 3;
+  private static final int NUM_READ_VERTICES = 3;
   private static final int DATA_SIZE = 1000;
   private List<String> blockIdList;
   private List<List<NonSerializedPartition<Integer>>> partitionsPerBlock;
@@ -115,20 +114,18 @@ public final class BlockStoreTest {
     when(serializerManager.getSerializer(any())).thenReturn(SERIALIZER);
 
     // Following part is for for the shuffle test.
-    final List<String> writeTaskIdList = new ArrayList<>(NUM_WRITE_TASKS);
-    final List<String> readTaskIdList = new ArrayList<>(NUM_READ_TASKS);
-    blockIdList = new ArrayList<>(NUM_WRITE_TASKS);
-    partitionsPerBlock = new ArrayList<>(NUM_WRITE_TASKS);
+    final List<String> writeVertexIdList = new ArrayList<>(NUM_WRITE_VERTICES);
+    final List<String> readTaskIdList = new ArrayList<>(NUM_READ_VERTICES);
+    blockIdList = new ArrayList<>(NUM_WRITE_VERTICES);
+    partitionsPerBlock = new ArrayList<>(NUM_WRITE_VERTICES);
 
     // Generates the ids of the tasks to be used.
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(
-        number -> writeTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Write_IR_vertex")));
-    IntStream.range(0, NUM_READ_TASKS).forEach(
-        number -> readTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Read_IR_vertex")));
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(number -> writeVertexIdList.add("Write_IR_vertex"));
+    IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
 
     // Generates the ids and the data of the blocks to be used.
     final String shuffleEdge = RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx -> {
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
       final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, writeTaskIdx);
       blockIdList.add(blockId);
@@ -137,27 +134,26 @@ public final class BlockStoreTest {
           blockId, BlockState.State.SCHEDULED, null);
 
       // Create blocks for this block.
-      final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_TASKS);
+      final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_VERTICES);
       partitionsPerBlock.add(partitionsForBlock);
-      IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx -> {
-        final int partitionsCount = writeTaskIdx * NUM_READ_TASKS + readTaskIdx;
+      IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx -> {
+        final int partitionsCount = writeTaskIdx * NUM_READ_VERTICES + readTaskIdx;
         partitionsForBlock.add(new NonSerializedPartition(
             readTaskIdx, getRangedNumList(partitionsCount * DATA_SIZE, (partitionsCount + 1) * DATA_SIZE), -1, -1));
       });
     });
 
     // Following part is for the concurrent read test.
-    final String writeTaskId = RuntimeIdGenerator.generateLogicalTaskId("conc_write_IR_vertex");
+    final String writeTaskId = "conc_write_IR_vertex";
     final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
     final String concEdge = RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
 
     // Generates the ids and the data to be used.
-    concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1);
+    concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
     blockManagerMaster.initializeState(concBlockId, "unused");
     blockManagerMaster.onBlockStateChanged(
         concBlockId, BlockState.State.SCHEDULED, null);
-    IntStream.range(0, NUM_CONC_READ_TASKS).forEach(
-        number -> concReadTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("conc_read_IR_vertex")));
+    IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> concReadTaskIdList.add("conc_read_IR_vertex"));
     concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, CONC_READ_DATA_SIZE), -1, -1);
 
     // Following part is for the shuffle in hash range test
@@ -170,16 +166,14 @@ public final class BlockStoreTest {
     expectedDataInRange = new ArrayList<>(NUM_READ_HASH_TASKS);
 
     // Generates the ids of the tasks to be used.
-    IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(
-        number -> writeHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_write_IR_vertex")));
-    IntStream.range(0, NUM_READ_HASH_TASKS).forEach(
-        number -> readHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_read_IR_vertex")));
+    IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
+    IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
     final String hashEdge = RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
-          hashEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1 + writeTaskIdx);
+          hashEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1 + writeTaskIdx);
       hashedBlockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
@@ -307,14 +301,14 @@ public final class BlockStoreTest {
    */
   private void shuffle(final BlockStore writerSideStore,
                        final BlockStore readerSideStore) {
-    final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_TASKS);
-    final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_TASKS);
-    final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_TASKS);
-    final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_TASKS);
+    final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_VERTICES);
+    final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_VERTICES);
+    final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_VERTICES);
+    final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_VERTICES);
     final long startNano = System.nanoTime();
 
     // Write concurrently
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx ->
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx ->
         writeFutureList.add(writeExecutor.submit(new Callable<Boolean>() {
           @Override
           public Boolean call() {
@@ -338,7 +332,7 @@ public final class BlockStoreTest {
         })));
 
     // Wait each writer to success
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(writer -> {
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(writer -> {
       try {
         assertTrue(writeFutureList.get(writer).get());
       } catch (final Exception e) {
@@ -348,12 +342,12 @@ public final class BlockStoreTest {
     final long writeEndNano = System.nanoTime();
 
     // Read concurrently and check whether the result is equal to the input
-    IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx ->
+    IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx ->
         readFutureList.add(readExecutor.submit(new Callable<Boolean>() {
           @Override
           public Boolean call() {
             try {
-              for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_TASKS; writeTaskIdx++) {
+              for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_VERTICES; writeTaskIdx++) {
                 readResultCheck(blockIdList.get(writeTaskIdx), HashRange.of(readTaskIdx, readTaskIdx + 1),
                     readerSideStore, partitionsPerBlock.get(writeTaskIdx).get(readTaskIdx).getData());
               }
@@ -366,7 +360,7 @@ public final class BlockStoreTest {
         })));
 
     // Wait each reader to success
-    IntStream.range(0, NUM_READ_TASKS).forEach(reader -> {
+    IntStream.range(0, NUM_READ_VERTICES).forEach(reader -> {
       try {
         assertTrue(readFutureList.get(reader).get());
       } catch (final Exception e) {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 998faf5..61ef9b0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -39,7 +39,6 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -74,7 +73,6 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.awt.*;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
@@ -544,7 +542,7 @@ public final class DataTransferTest {
   }
 
   private PhysicalStage setupStages(final String stageId) {
-    final DAG<Task, RuntimeEdge<Task>> emptyDag = new DAGBuilder<Task, RuntimeEdge<Task>>().build();
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
 
     return new PhysicalStage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
index bdc62e5..5af2476 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
@@ -125,7 +125,7 @@ public final class JobStateManagerTest {
         new TestPolicy(), "");
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertEquals(jobStateManager.getJobId(), "TestPlan");
@@ -164,7 +164,7 @@ public final class JobStateManagerTest {
     final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertFalse(jobStateManager.checkJobTermination());

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.