You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/01 05:17:53 UTC

[GitHub] jeongyooneo closed pull request #24: [NEMO-79] Clean up the legacy Task

jeongyooneo closed pull request #24: [NEMO-79] Clean up the legacy Task
URL: https://github.com/apache/incubator-nemo/pull/24
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 12f0d483..e4c77bf9 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -64,8 +64,8 @@ def __init__(self, data):
         self.id = data['id']
         self.state = data['state']
         self.tasks = {}
-        for task in data['tasks']:
-            self.tasks[task['id']] = TaskState(task)
+        for irVertex in data['tasks']:
+            self.tasks[irVertex['id']] = TaskState(irVertex)
     @classmethod
     def empty(cls):
         return cls({'id': None, 'state': None, 'tasks': []})
@@ -252,7 +252,7 @@ def internalDstFor(self, edgeWithLoopId):
 class PhysicalStage:
     def __init__(self, id, properties, state):
         self.id = id
-        self.task = DAG(properties['taskDag'], JobState.empty())
+        self.irVertex = DAG(properties['irDag'], JobState.empty())
         self.idx = getIdx()
         self.state = state
     @property
@@ -264,12 +264,12 @@ def dot(self):
         dot = 'subgraph cluster_{} {{'.format(self.idx)
         dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
         dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
-        dot += self.task.dot
+        dot += self.irVertex.dot
         dot += '}'
         return dot
     @property
     def oneVertex(self):
-        return next(iter(self.task.vertices.values())).oneVertex
+        return next(iter(self.irVertex.vertices.values())).oneVertex
     @property
     def logicalEnd(self):
         return 'cluster_{}'.format(self.idx)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index ec2e469c..df783b4b 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -21,7 +21,8 @@
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 /**
- * The top-most wrapper for a user operation in the IR.
+ * The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo.
+ * An IRVertex is created and modified in the compiler, and executed in the runtime.
  */
 public abstract class IRVertex extends Vertex {
   private final ExecutionPropertyMap executionProperties;
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
similarity index 73%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
index b64d61b3..43fbadc4 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -22,23 +22,23 @@
 import java.util.List;
 
 /**
- * Source vertex with initial data as object.
- * @param <T> type of initial data.
+ * Source vertex with the data in memory.
+ * @param <T> type of data.
  */
-public final class InitializedSourceVertex<T> extends SourceVertex<T> {
-  private final Iterable<T> initializedSourceData;
+public final class InMemorySourceVertex<T> extends SourceVertex<T> {
+  private Iterable<T> initializedSourceData;
 
   /**
    * Constructor.
    * @param initializedSourceData the initial data object.
    */
-  public InitializedSourceVertex(final Iterable<T> initializedSourceData) {
+  public InMemorySourceVertex(final Iterable<T> initializedSourceData) {
     this.initializedSourceData = initializedSourceData;
   }
 
   @Override
-  public InitializedSourceVertex<T> getClone() {
-    final InitializedSourceVertex<T> that = new InitializedSourceVertex<>(this.initializedSourceData);
+  public InMemorySourceVertex<T> getClone() {
+    final InMemorySourceVertex<T> that = new InMemorySourceVertex<>(this.initializedSourceData);
     this.copyExecutionPropertiesTo(that);
     return that;
   }
@@ -61,23 +61,28 @@ public InitializedSourceVertex(final Iterable<T> initializedSourceData) {
         }
       }
 
-      readables.add(new InitializedSourceReadable<>(dataForReader));
+      readables.add(new InMemorySourceReadable<>(dataForReader));
     }
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    initializedSourceData = null;
+  }
+
   /**
-   * Readable for initialized source vertex. It simply returns the initialized data.
-   * @param <T> type of the initial data.
+   * Simply returns the in-memory data.
+   * @param <T> type of the data.
    */
-  private static final class InitializedSourceReadable<T> implements Readable<T> {
+  private static final class InMemorySourceReadable<T> implements Readable<T> {
     private final Iterable<T> initializedSourceData;
 
     /**
      * Constructor.
      * @param initializedSourceData the source data.
      */
-    private InitializedSourceReadable(final Iterable<T> initializedSourceData) {
+    private InMemorySourceReadable(final Iterable<T> initializedSourceData) {
       this.initializedSourceData = initializedSourceData;
     }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
index 1597462d..d319b72d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
@@ -34,4 +34,13 @@
    * @throws Exception if fail to get.
    */
   public abstract List<Readable<O>> getReadables(int desiredNumOfSplits) throws Exception;
+
+  /**
+   * Clears internal states, must be called after getReadables().
+   * Concretely, this clears the huge list of input splits held by objects like BeamBoundedSourceVertex before
+   * sending the vertex to remote executors.
+   * Between clearing states of an existing vertex, and creating a new vertex, we've chosen the former approach
+   * to ensure consistent use of the same IRVertex object across the compiler, the master, and the executors.
+   */
+  public abstract void clearInternalStates();
 }
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 027378cc..4ef453c9 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -59,7 +59,7 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<PhysicalStage, PhysicalStageEdge> physicalStageDAG = irDAG.convert(physicalPlanGenerator);
     final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
-        physicalStageDAG, physicalPlanGenerator.getTaskIRVertexMap());
+        physicalStageDAG, physicalPlanGenerator.getIdToIRVertex());
     return physicalPlan;
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index f03bc576..1143a1a8 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -35,7 +35,8 @@
  */
 public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
-  private final BoundedSource<O> source;
+  private BoundedSource<O> source;
+  private final String sourceDescription;
 
   /**
    * Constructor of BeamBoundedSourceVertex.
@@ -43,6 +44,7 @@
    */
   public BeamBoundedSourceVertex(final BoundedSource<O> source) {
     this.source = source;
+    this.sourceDescription = source.toString();
   }
 
   @Override
@@ -62,13 +64,18 @@ public BeamBoundedSourceVertex getClone() {
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    source = null;
+  }
+
   @Override
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{");
     sb.append(irVertexPropertiesToString());
     sb.append(", \"source\": \"");
-    sb.append(source);
+    sb.append(sourceDescription);
     sb.append("\"}");
     return sb.toString();
   }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
index a8d024c2..a3e3fd6b 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
@@ -71,7 +71,7 @@
                                   final int parallelism) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
-    final IRVertex initializedSourceVertex = new InitializedSourceVertex<>(initialData);
+    final IRVertex initializedSourceVertex = new InMemorySourceVertex<>(initialData);
     initializedSourceVertex.setProperty(ParallelismProperty.of(parallelism));
     builder.addVertex(initializedSourceVertex);
 
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 775faf41..6e83b7f4 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -30,7 +30,7 @@
  * @param <T> type of data to read.
  */
 public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
-  private final List<Readable<T>> readables;
+  private List<Readable<T>> readables;
 
   /**
    * Constructor.
@@ -72,6 +72,11 @@ public SparkDatasetBoundedSourceVertex getClone() {
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    readables = null;
+  }
+
   /**
    * A Readable wrapper for Spark Dataset.
    */
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index a5e78333..cac26749 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -27,7 +27,7 @@
  * Bounded source vertex for Spark text file.
  */
 public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String> {
-  private final List<Readable<String>> readables;
+  private List<Readable<String>> readables;
 
   /**
    * Constructor.
@@ -72,6 +72,11 @@ public SparkTextFileBoundedSourceVertex getClone() {
     return readables;
   }
 
+  @Override
+  public void clearInternalStates() {
+    readables = null;
+  }
+
   /**
    * A Readable wrapper for Spark text file.
    */
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index 257b7573..34c66fcc 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -72,7 +72,7 @@ public void close() {
    * @param <T> type of the data.
    */
   public static final class EmptySourceVertex<T> extends SourceVertex<T> {
-    private final String name;
+    private String name;
 
     /**
      * Constructor.
@@ -96,6 +96,10 @@ public String toString() {
       return Arrays.asList(new EmptyReadable<>());
     }
 
+    @Override
+    public void clearInternalStates() {
+    }
+
     @Override
     public EmptySourceVertex<T> getClone() {
       return new EmptySourceVertex<>(this.name);
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index dd19b274..b53ff344 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -29,7 +29,6 @@
   private static final String BLOCK_PREFIX = "Block-";
   private static final String BLOCK_ID_SPLITTER = "_";
   private static final String TASK_INFIX = "-Task-";
-  private static final String PHYSICAL_TASK_ID_SPLITTER = "_";
 
   /**
    * Private constructor which will not be used.
@@ -37,6 +36,9 @@
   private RuntimeIdGenerator() {
   }
 
+
+  //////////////////////////////////////////////////////////////// Generate IDs
+
   /**
    * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
    *
@@ -76,36 +78,13 @@ public static String generateStageId(final Integer stageId) {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
-   *
-   * @param irVertexId the ID of the IR vertex.
-   * @return the generated ID
-   */
-  public static String generateLogicalTaskId(final String irVertexId) {
-    return "Task-" + irVertexId;
-  }
-
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
-   *
-   * @param index         the index of the physical task.
-   * @param logicalTaskId the logical ID of the task.
-   * @return the generated ID
-   */
-  public static String generatePhysicalTaskId(final int index,
-                                              final String logicalTaskId) {
-    return logicalTaskId + PHYSICAL_TASK_ID_SPLITTER + index;
-  }
-
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
+   * Generates the ID for a task.
    *
    * @param index   the index of this task.
    * @param stageId the ID of the stage.
    * @return the generated ID
    */
-  public static String generateTaskId(final int index,
-                                           final String stageId) {
+  public static String generateTaskId(final int index, final String stageId) {
     return stageId + TASK_INFIX + index;
   }
 
@@ -122,12 +101,12 @@ public static String generateExecutorId() {
    * Generates the ID for a block, whose data is the output of a task.
    *
    * @param runtimeEdgeId of the block
-   * @param taskIndex     of the block
+   * @param producerTaskIndex of the block
    * @return the generated ID
    */
   public static String generateBlockId(final String runtimeEdgeId,
-                                       final int taskIndex) {
-    return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + taskIndex;
+                                       final int producerTaskIndex) {
+    return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + producerTaskIndex;
   }
 
   /**
@@ -148,6 +127,8 @@ public static String generateResourceSpecId() {
     return "ResourceSpec-" + resourceSpecIdGenerator.getAndIncrement();
   }
 
+  //////////////////////////////////////////////////////////////// Parse IDs
+
   /**
    * Extracts runtime edge ID from a block ID.
    *
@@ -210,14 +191,4 @@ public static int getIndexFromTaskId(final String taskId) {
   private static String[] parseTaskId(final String taskId) {
     return taskId.split(TASK_INFIX);
   }
-
-  /**
-   * Extracts logical task ID from a physical task ID.
-   *
-   * @param physicalTaskId the physical task ID to extract.
-   * @return the logical task ID.
-   */
-  public static String getLogicalTaskIdIdFromPhysicalTaskId(final String physicalTaskId) {
-    return physicalTaskId.split(PHYSICAL_TASK_ID_SPLITTER)[0];
-  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index b9d263bd..7d3e7f4e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -87,7 +87,7 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, Lis
       optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
     });
 
-    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getTaskIRVertexMap());
+    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getIdToIRVertex());
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
deleted file mode 100644
index 94e1332d..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.Readable;
-
-/**
- * BoundedSourceTask.
- * @param <O> the output type.
- */
-public final class BoundedSourceTask<O> extends Task {
-  private Readable<O> readable;
-
-  /**
-   * Constructor.
-   * @param taskId id of the task.
-   * @param irVertexId id of the IR vertex.
-   */
-  public BoundedSourceTask(final String taskId,
-                           final String irVertexId) {
-    super(taskId, irVertexId);
-  }
-
-  /**
-   * Sets the readable for this task.
-   * @param readableToSet the readable to set.
-   */
-  public void setReadable(final Readable<O> readableToSet) {
-    this.readable = readableToSet;
-  }
-
-  /**
-   * @return the readable of source data.
-   */
-  public Readable<O> getReadable() {
-    return readable;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
similarity index 56%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
index d5f19a71..e7e37efa 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
@@ -23,16 +23,9 @@
 import java.util.Map;
 
 /**
- * A ScheduledTask is a grouping of {@link Task}s that belong to a stage.
- * Executors receive units of ScheduledTasks during job execution,
- * and thus the resource type of all tasks of a ScheduledTask must be identical.
- * A stage contains a list of IDs of Tasks whose length corresponds to stage/operator parallelism.
- *
- * This class includes all information which will be required from the executor after scheduled,
- * including the (serialized) DAG of {@link Task}s,
- * the incoming/outgoing edges to/from the stage the Task belongs to, and so on.
+ * An ExecutableTask is a self-contained executable that can be executed in specific types of containers.
  */
-public final class ScheduledTask implements Serializable {
+public final class ExecutableTask implements Serializable {
   private final String jobId;
   private final String taskId;
   private final int taskIdx;
@@ -41,37 +34,37 @@
   private final int attemptIdx;
   private final String containerType;
   private final byte[] serializedTaskDag;
-  private final Map<String, Readable> logicalTaskIdToReadable;
+  private final Map<String, Readable> irVertexIdToReadable;
 
   /**
    * Constructor.
    *
-   * @param jobId                   the id of the job.
-   * @param serializedTaskDag  the serialized DAG of the task.
-   * @param taskId             the ID of the scheduled task.
-   * @param taskIncomingEdges  the incoming edges of the task.
-   * @param taskOutgoingEdges  the outgoing edges of the task.
-   * @param attemptIdx              the attempt index.
-   * @param containerType           the type of container to execute the task on.
-   * @param logicalTaskIdToReadable the map between logical task ID and readable.
+   * @param jobId                the id of the job.
+   * @param taskId               the ID of the scheduled task.
+   * @param attemptIdx           the attempt index.
+   * @param containerType        the type of container to execute the task on.
+   * @param serializedIRDag      the serialized DAG of the task.
+   * @param taskIncomingEdges    the incoming edges of the task.
+   * @param taskOutgoingEdges    the outgoing edges of the task.
+   * @param irVertexIdToReadable the map between logical task ID and readable.
    */
-  public ScheduledTask(final String jobId,
-                            final byte[] serializedTaskDag,
-                            final String taskId,
-                            final List<PhysicalStageEdge> taskIncomingEdges,
-                            final List<PhysicalStageEdge> taskOutgoingEdges,
-                            final int attemptIdx,
-                            final String containerType,
-                            final Map<String, Readable> logicalTaskIdToReadable) {
+  public ExecutableTask(final String jobId,
+                        final String taskId,
+                        final int attemptIdx,
+                        final String containerType,
+                        final byte[] serializedIRDag,
+                        final List<PhysicalStageEdge> taskIncomingEdges,
+                        final List<PhysicalStageEdge> taskOutgoingEdges,
+                        final Map<String, Readable> irVertexIdToReadable) {
     this.jobId = jobId;
     this.taskId = taskId;
     this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
-    this.taskIncomingEdges = taskIncomingEdges;
-    this.taskOutgoingEdges = taskOutgoingEdges;
     this.attemptIdx = attemptIdx;
     this.containerType = containerType;
-    this.serializedTaskDag = serializedTaskDag;
-    this.logicalTaskIdToReadable = logicalTaskIdToReadable;
+    this.serializedTaskDag = serializedIRDag;
+    this.taskIncomingEdges = taskIncomingEdges;
+    this.taskOutgoingEdges = taskOutgoingEdges;
+    this.irVertexIdToReadable = irVertexIdToReadable;
   }
 
   /**
@@ -84,7 +77,7 @@ public String getJobId() {
   /**
    * @return the serialized DAG of the task.
    */
-  public byte[] getSerializedTaskDag() {
+  public byte[] getSerializedIRDag() {
     return serializedTaskDag;
   }
 
@@ -133,7 +126,7 @@ public String getContainerType() {
   /**
    * @return the map between logical task ID and readable.
    */
-  public Map<String, Readable> getLogicalTaskIdToReadable() {
-    return logicalTaskIdToReadable;
+  public Map<String, Readable> getIrVertexIdToReadable() {
+    return irVertexIdToReadable;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java
deleted file mode 100644
index 249adff8..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-/**
- * MetricCollectionBarrierTask.
- */
-public final class MetricCollectionBarrierTask extends Task {
-  /**
-   * Constructor.
-   * @param taskId id of the task.
-   * @param irVertexId id of the IR vertex.
-   */
-  MetricCollectionBarrierTask(final String taskId,
-                              final String irVertexId) {
-    super(taskId, irVertexId);
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java
deleted file mode 100644
index 61fa8105..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-
-/**
- * OperatorTask.
- */
-public final class OperatorTask extends Task {
-  private final Transform transform;
-
-  /**
-   * Constructor.
-   * @param taskId id of the task.
-   * @param runtimeVertexId id of the runtime vertex.
-   * @param transform transform to perform.
-   */
-  public OperatorTask(final String taskId,
-                      final String runtimeVertexId,
-                      final Transform transform) {
-    super(taskId, runtimeVertexId);
-    this.transform = transform;
-  }
-
-  /**
-   * @return the transform to perform.
-   */
-  public Transform getTransform() {
-    return transform;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
index 85950fa3..b25a0bad 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
@@ -25,26 +25,23 @@
  * A job's physical plan, to be executed by the Runtime.
  */
 public final class PhysicalPlan implements Serializable {
-
   private final String id;
-
   private final DAG<PhysicalStage, PhysicalStageEdge> stageDAG;
-
-  private final Map<Task, IRVertex> taskIRVertexMap;
+  private final Map<String, IRVertex> idToIRVertex;
 
   /**
    * Constructor.
    *
    * @param id              ID of the plan.
    * @param stageDAG        the DAG of stages.
-   * @param taskIRVertexMap map from task to IR vertex.
+   * @param idToIRVertex map from task to IR vertex.
    */
   public PhysicalPlan(final String id,
                       final DAG<PhysicalStage, PhysicalStageEdge> stageDAG,
-                      final Map<Task, IRVertex> taskIRVertexMap) {
+                      final Map<String, IRVertex> idToIRVertex) {
     this.id = id;
     this.stageDAG = stageDAG;
-    this.taskIRVertexMap = taskIRVertexMap;
+    this.idToIRVertex = idToIRVertex;
   }
 
   /**
@@ -61,21 +58,11 @@ public String getId() {
     return stageDAG;
   }
 
-  /**
-   * Get an IR vertex of the given task.
-   *
-   * @param task task to find the IR vertex of.
-   * @return the corresponding IR vertex of the given task.
-   */
-  public IRVertex getIRVertexOf(final Task task) {
-    return taskIRVertexMap.get(task);
-  }
-
   /**
    * @return the map from task to IR vertex.
    */
-  public Map<Task, IRVertex> getTaskIRVertexMap() {
-    return taskIRVertexMap;
+  public Map<String, IRVertex> getIdToIRVertex() {
+    return idToIRVertex;
   }
 
   @Override
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
index 2ae72710..27991e3a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
@@ -24,7 +24,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.stage.*;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
@@ -70,6 +69,7 @@ private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final Strin
     // for debugging purposes.
     dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan");
     // then create tasks and make it into a physical execution plan.
+
     return stagesIntoPlan(dagOfStages);
   }
 
@@ -197,16 +197,14 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     return dagOfStagesBuilder.build();
   }
 
-  // Map that keeps track of the IRVertex of each tasks
-  private final Map<Task, IRVertex> taskIRVertexMap = new HashMap<>();
+
+  private final Map<String, IRVertex> idToIRVertex = new HashMap<>();
 
   /**
-   * Getter for taskIRVertexMap.
-   *
-   * @return the taskIRVertexMap.
+   * @return the idToIRVertex map.
    */
-  public Map<Task, IRVertex> getTaskIRVertexMap() {
-    return taskIRVertexMap;
+  public Map<String, IRVertex> getIdToIRVertex() {
+    return idToIRVertex;
   }
 
   /**
@@ -221,72 +219,68 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder = new DAGBuilder<>();
 
     for (final Stage stage : dagOfStages.getVertices()) {
-      final Map<IRVertex, Task> irVertexTaskMap = new HashMap<>();
       final List<IRVertex> stageVertices = stage.getStageInternalDAG().getVertices();
 
       final ExecutionPropertyMap firstVertexProperties = stageVertices.iterator().next().getExecutionProperties();
       final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
       final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
 
-      // Only one task DAG will be created and reused.
-      final DAGBuilder<Task, RuntimeEdge<Task>> stageInternalDAGBuilder = new DAGBuilder<>();
-      // Collect split source readables in advance and bind to each scheduled task to avoid extra source split.
-      final List<Map<String, Readable>> logicalTaskIdToReadables = new ArrayList<>(stageParallelism);
+      // Only one DAG will be created and reused.
+      final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
+      // Collect split source readables in advance and bind to each irvertex to avoid extra source split.
+      final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
-        logicalTaskIdToReadables.add(new HashMap<>());
+        vertexIdToReadables.add(new HashMap<>());
       }
 
-      // Iterate over the vertices contained in this stage to convert to tasks.
+      // Iterate over the vertices contained in this stage
       stageVertices.forEach(irVertex -> {
-        final Task newTaskToAdd;
+        if (!(irVertex instanceof  SourceVertex
+            || irVertex instanceof OperatorVertex
+            || irVertex instanceof MetricCollectionBarrierVertex)) {
+          // Sanity check
+          throw new IllegalStateException(irVertex.toString());
+        }
+
         if (irVertex instanceof SourceVertex) {
           final SourceVertex sourceVertex = (SourceVertex) irVertex;
 
+          // Take care of the Readables
           try {
             final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
-            final String irVertexId = sourceVertex.getId();
-            final String logicalTaskId = RuntimeIdGenerator.generateLogicalTaskId(irVertexId);
             for (int i = 0; i < stageParallelism; i++) {
-              logicalTaskIdToReadables.get(i).put(logicalTaskId, readables.get(i));
+              vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
             }
-            newTaskToAdd = new BoundedSourceTask(logicalTaskId, irVertexId);
           } catch (Exception e) {
             throw new PhysicalPlanGenerationException(e);
           }
-        } else if (irVertex instanceof OperatorVertex) {
-          final OperatorVertex operatorVertex = (OperatorVertex) irVertex;
-          final String operatorVertexId = operatorVertex.getId();
-          newTaskToAdd = new OperatorTask(RuntimeIdGenerator.generateLogicalTaskId(operatorVertexId),
-              operatorVertexId, operatorVertex.getTransform());
-
-        } else if (irVertex instanceof MetricCollectionBarrierVertex) {
-          final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-              (MetricCollectionBarrierVertex) irVertex;
-          final String metricVertexId = metricCollectionBarrierVertex.getId();
-          newTaskToAdd = new MetricCollectionBarrierTask(RuntimeIdGenerator.generateLogicalTaskId(metricVertexId),
-              metricVertexId);
-        } else {
-          throw new IllegalVertexOperationException("This vertex type is not supported");
+
+          // Clear internal metadata
+          sourceVertex.clearInternalStates();
         }
-        stageInternalDAGBuilder.addVertex(newTaskToAdd);
-        irVertexTaskMap.put(irVertex, newTaskToAdd);
-        taskIRVertexMap.put(newTaskToAdd, irVertex);
+
+        stageInternalDAGBuilder.addVertex(irVertex);
+        idToIRVertex.put(irVertex.getId(), irVertex);
       });
 
-      // connect internal edges in the task. It suffices to iterate over only the stage internal inEdges.
+      // connect internal edges in the stage. It suffices to iterate over only the stage internal inEdges.
       final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
       stageInternalDAG.getVertices().forEach(irVertex -> {
         final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
         inEdges.forEach(edge ->
-            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(edge.getId(), edge.getExecutionProperties(),
-                irVertexTaskMap.get(edge.getSrc()), irVertexTaskMap.get(edge.getDst()),
-                edge.getCoder(), edge.isSideInput())));
+            stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
+                edge.getId(),
+                edge.getExecutionProperties(),
+                edge.getSrc(),
+                edge.getDst(),
+                edge.getCoder(),
+                edge.isSideInput())));
       });
 
-      // Create the task to add for this stage.
+      // Create the physical stage.
       final PhysicalStage physicalStage =
-          new PhysicalStage(stage.getId(), stageInternalDAGBuilder.build(),
-              stageParallelism, stage.getScheduleGroupIndex(), containerType, logicalTaskIdToReadables);
+          new PhysicalStage(stage.getId(), stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
+              stageParallelism, stage.getScheduleGroupIndex(), containerType, vertexIdToReadables);
 
       physicalDAGBuilder.addVertex(physicalStage);
       runtimeStageIdToPhysicalStageMap.put(stage.getId(), physicalStage);
@@ -300,7 +294,8 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
 
           physicalDAGBuilder.connectVertices(new PhysicalStageEdge(stageEdge.getId(),
               stageEdge.getExecutionProperties(),
-              stageEdge.getSrcVertex(), stageEdge.getDstVertex(),
+              stageEdge.getSrcVertex(),
+              stageEdge.getDstVertex(),
               srcStage, dstStage,
               stageEdge.getCoder(),
               stageEdge.isSideInput()));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
index ded0a8e0..a58cfba4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
@@ -18,6 +18,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.commons.lang3.SerializationUtils;
@@ -30,50 +31,50 @@
  * PhysicalStage.
  */
 public final class PhysicalStage extends Vertex {
-  private final DAG<Task, RuntimeEdge<Task>> taskDag;
+  private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
   private final int parallelism;
   private final int scheduleGroupIndex;
   private final String containerType;
-  private final byte[] serializedTaskDag;
-  private final List<Map<String, Readable>> logicalTaskIdToReadables;
+  private final byte[] serializedIRDag;
+  private final List<Map<String, Readable>> vertexIdToReadables;
 
   /**
    * Constructor.
    *
-   * @param stageId                  ID of the stage.
-   * @param taskDag                  the DAG of the task in this stage.
-   * @param parallelism              how many tasks will be executed in this stage.
-   * @param scheduleGroupIndex       the schedule group index.
-   * @param containerType            the type of container to execute the task on.
-   * @param logicalTaskIdToReadables the list of maps between logical task ID and {@link Readable}.
+   * @param stageId             ID of the stage.
+   * @param irDag               the DAG of the task in this stage.
+   * @param parallelism         how many tasks will be executed in this stage.
+   * @param scheduleGroupIndex  the schedule group index.
+   * @param containerType       the type of container to execute the task on.
+   * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
    */
   public PhysicalStage(final String stageId,
-                       final DAG<Task, RuntimeEdge<Task>> taskDag,
+                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
                        final int parallelism,
                        final int scheduleGroupIndex,
                        final String containerType,
-                       final List<Map<String, Readable>> logicalTaskIdToReadables) {
+                       final List<Map<String, Readable>> vertexIdToReadables) {
     super(stageId);
-    this.taskDag = taskDag;
+    this.irDag = irDag;
     this.parallelism = parallelism;
     this.scheduleGroupIndex = scheduleGroupIndex;
     this.containerType = containerType;
-    this.serializedTaskDag = SerializationUtils.serialize(taskDag);
-    this.logicalTaskIdToReadables = logicalTaskIdToReadables;
+    this.serializedIRDag = SerializationUtils.serialize(irDag);
+    this.vertexIdToReadables = vertexIdToReadables;
   }
 
   /**
-   * @return the task.
+   * @return the IRVertex DAG.
    */
-  public DAG<Task, RuntimeEdge<Task>> getTaskDag() {
-    return taskDag;
+  public DAG<IRVertex, RuntimeEdge<IRVertex>> getIRDAG() {
+    return irDag;
   }
 
   /**
    * @return the serialized DAG of the task.
    */
-  public byte[] getSerializedTaskDag() {
-    return serializedTaskDag;
+  public byte[] getSerializedIRDAG() {
+    return serializedIRDag;
   }
 
   /**
@@ -102,17 +103,17 @@ public String getContainerType() {
   }
 
   /**
-   * @return the list of maps between logical task ID and readable.
+   * @return the list of maps between vertex ID and readables.
    */
-  public List<Map<String, Readable>> getLogicalTaskIdToReadables() {
-    return logicalTaskIdToReadables;
+  public List<Map<String, Readable>> getVertexIdToReadables() {
+    return vertexIdToReadables;
   }
 
   @Override
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
-    sb.append(", \"taskDag\": ").append(taskDag);
+    sb.append(", \"irDag\": ").append(irDag);
     sb.append(", \"parallelism\": ").append(parallelism);
     sb.append(", \"containerType\": \"").append(containerType).append("\"");
     sb.append('}');
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
deleted file mode 100644
index 8d241e1f..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.dag.Vertex;
-
-/**
- * Task.
- * The index value is identical to the Task's index it belongs to.
- */
-public abstract class Task extends Vertex {
-  private final String irVertexId;
-
-  /**
-   * Constructor.
-   *
-   * @param taskId      id of the task.
-   * @param irVertexId  id for the IR vertex.
-   */
-  public Task(final String taskId,
-              final String irVertexId) {
-    super(taskId);
-    this.irVertexId = irVertexId;
-  }
-
-  /**
-   * @return the id of the runtime vertex.
-   */
-  public final String getIrVertexId() {
-    return irVertexId;
-  }
-
-  @Override
-  public final String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{\"taskId\": \"").append(getId()).append("\"}");
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java
deleted file mode 100644
index 51f0d31e..00000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-/**
- * UnboundedSourceTask.
- */
-public final class UnboundedSourceTask extends Task {
-  /**
-   * Constructor.
-   * @param taskId the id of the task.
-   * @param runtimeVertexId id of the runtime vertex.
-   */
-  public UnboundedSourceTask(final String taskId,
-                             final String runtimeVertexId) {
-    super(taskId, runtimeVertexId);
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index 08a9a66b..31f56d5e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -18,8 +18,7 @@
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
+ * Represents the states and their transitions of a task.
  */
 public final class TaskState {
   private final StateMachine stateMachine;
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index c266bab0..343b8e54 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -62,7 +62,7 @@ message TaskStateChangedMsg {
     required string executorId = 1;
     required string taskId = 2;
     required TaskStateFromExecutor state = 3;
-    optional string taskPutOnHoldId = 4;
+    optional string vertexPutOnHoldId = 4;
     optional RecoverableFailureCause failureCause = 5;
     required int32 attemptIdx = 6;
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8fd52816..7a7b5231 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,6 +17,7 @@
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.IllegalMessageException;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
@@ -27,8 +28,7 @@
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import org.apache.commons.lang3.SerializationUtils;
@@ -87,35 +87,34 @@ public String getExecutorId() {
     return executorId;
   }
 
-  private synchronized void onTaskReceived(final ScheduledTask scheduledTask) {
+  private synchronized void onTaskReceived(final ExecutableTask executableTask) {
     LOG.debug("Executor [{}] received Task [{}] to execute.",
-        new Object[]{executorId, scheduledTask.getTaskId()});
-    executorService.execute(() -> launchTask(scheduledTask));
+        new Object[]{executorId, executableTask.getTaskId()});
+    executorService.execute(() -> launchTask(executableTask));
   }
 
   /**
    * Launches the Task, and keeps track of the execution state with taskStateManager.
-   * @param scheduledTask to launch.
+   * @param executableTask to launch.
    */
-  private void launchTask(final ScheduledTask scheduledTask) {
+  private void launchTask(final ExecutableTask executableTask) {
     try {
-      final DAG<Task, RuntimeEdge<Task>> taskDag =
-          SerializationUtils.deserialize(scheduledTask.getSerializedTaskDag());
+      final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
+          SerializationUtils.deserialize(executableTask.getSerializedIRDag());
       final TaskStateManager taskStateManager =
-          new TaskStateManager(scheduledTask, taskDag, executorId,
-              persistentConnectionToMasterMap, metricMessageSender);
+          new TaskStateManager(executableTask, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
-      scheduledTask.getTaskIncomingEdges()
+      executableTask.getTaskIncomingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      scheduledTask.getTaskOutgoingEdges()
+      executableTask.getTaskOutgoingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      taskDag.getVertices().forEach(v -> {
-        taskDag.getOutgoingEdgesOf(v)
+      irDag.getVertices().forEach(v -> {
+        irDag.getOutgoingEdgesOf(v)
             .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
       });
 
       new TaskExecutor(
-          scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+          executableTask, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
@@ -150,9 +149,9 @@ public void onMessage(final ControlMessage.Message message) {
       switch (message.getType()) {
       case ScheduleTask:
         final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
-        final ScheduledTask scheduledTask =
+        final ExecutableTask executableTask =
             SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
-        onTaskReceived(scheduledTask);
+        onTaskReceived(executableTask);
         break;
       default:
         throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index c68215ad..2349ab76 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -21,9 +21,8 @@
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
@@ -41,79 +40,88 @@
  * Executes a task.
  */
 public final class TaskExecutor {
+  // Static variables
   private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
+  private static final String ITERATORID_PREFIX = "ITERATOR_";
+  private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
 
-  private final DAG<Task, RuntimeEdge<Task>> taskDag;
+  // From ExecutableTask
+  private final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag;
   private final String taskId;
   private final int taskIdx;
   private final TaskStateManager taskStateManager;
   private final List<PhysicalStageEdge> stageIncomingEdges;
   private final List<PhysicalStageEdge> stageOutgoingEdges;
+  private Map<String, Readable> irVertexIdToReadable;
+
+  // Other parameters
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  private final List<InputReader> inputReaders;
-  private final Map<InputReader, List<TaskDataHandler>> inputReaderToDataHandlersMap;
+  // Data structures
+  private final Map<InputReader, List<IRVertexDataHandler>> inputReaderToDataHandlersMap;
   private final Map<String, Iterator> idToSrcIteratorMap;
-  private final Map<String, List<TaskDataHandler>> srcIteratorIdToDataHandlersMap;
-  private final Map<String, List<TaskDataHandler>> iteratorIdToDataHandlersMap;
+  private final Map<String, List<IRVertexDataHandler>> srcIteratorIdToDataHandlersMap;
+  private final Map<String, List<IRVertexDataHandler>> iteratorIdToDataHandlersMap;
   private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>> partitionQueue;
-  private List<TaskDataHandler> taskDataHandlers;
-  private Map<OutputCollectorImpl, List<TaskDataHandler>> outputToChildrenDataHandlersMap;
-  private final Set<String> finishedTaskIds;
-  private int numPartitions;
-  private Map<String, Readable> logicalTaskIdToReadable;
+  private List<IRVertexDataHandler> irVertexDataHandlers;
+  private Map<OutputCollectorImpl, List<IRVertexDataHandler>> outputToChildrenDataHandlersMap;
+  private final Set<String> finishedVertexIds;
 
   // For metrics
   private long serBlockSize;
   private long encodedBlockSize;
 
-  private boolean isExecutionRequested;
-  private String logicalTaskIdPutOnHold;
+  // Misc
+  private boolean isExecuted;
+  private String irVertexIdPutOnHold;
+  private int numPartitions;
 
-  private static final String ITERATORID_PREFIX = "ITERATOR_";
-  private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
 
   /**
    * Constructor.
-   * @param scheduledTask Task with information needed during execution.
-   * @param taskDag A DAG of Tasks.
+   * @param executableTask Task with information needed during execution.
+   * @param irVertexDag A DAG of vertices.
    * @param taskStateManager State manager for this Task.
    * @param channelFactory For reading from/writing to data to other Stages.
    * @param metricMessageSender For sending metric with execution stats to Master.
    */
-  public TaskExecutor(final ScheduledTask scheduledTask,
-                      final DAG<Task, RuntimeEdge<Task>> taskDag,
+  public TaskExecutor(final ExecutableTask executableTask,
+                      final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
                       final DataTransferFactory channelFactory,
                       final MetricMessageSender metricMessageSender) {
-    this.taskDag = taskDag;
-    this.taskId = scheduledTask.getTaskId();
-    this.taskIdx = scheduledTask.getTaskIdx();
+    // Information from the ExecutableTask.
+    this.irVertexDag = irVertexDag;
+    this.taskId = executableTask.getTaskId();
+    this.taskIdx = executableTask.getTaskIdx();
+    this.stageIncomingEdges = executableTask.getTaskIncomingEdges();
+    this.stageOutgoingEdges = executableTask.getTaskOutgoingEdges();
+    this.irVertexIdToReadable = executableTask.getIrVertexIdToReadable();
+
+    // Other parameters.
     this.taskStateManager = taskStateManager;
-    this.stageIncomingEdges = scheduledTask.getTaskIncomingEdges();
-    this.stageOutgoingEdges = scheduledTask.getTaskOutgoingEdges();
-    this.logicalTaskIdToReadable = scheduledTask.getLogicalTaskIdToReadable();
     this.channelFactory = channelFactory;
     this.metricCollector = new MetricCollector(metricMessageSender);
-    this.logicalTaskIdPutOnHold = null;
-    this.isExecutionRequested = false;
 
-    this.inputReaders = new ArrayList<>();
+    // Initialize data structures.
     this.inputReaderToDataHandlersMap = new ConcurrentHashMap<>();
     this.idToSrcIteratorMap = new HashMap<>();
     this.srcIteratorIdToDataHandlersMap = new HashMap<>();
     this.iteratorIdToDataHandlersMap = new ConcurrentHashMap<>();
     this.partitionQueue = new LinkedBlockingQueue<>();
     this.outputToChildrenDataHandlersMap = new HashMap<>();
-    this.taskDataHandlers = new ArrayList<>();
-
-    this.finishedTaskIds = new HashSet<>();
-    this.numPartitions = 0;
+    this.irVertexDataHandlers = new ArrayList<>();
+    this.finishedVertexIds = new HashSet<>();
 
+    // Metrics
     this.serBlockSize = 0;
     this.encodedBlockSize = 0;
 
+    // Misc
+    this.isExecuted = false;
+    this.irVertexIdPutOnHold = null;
+    this.numPartitions = 0;
 
     initialize();
   }
@@ -124,29 +132,23 @@ public TaskExecutor(final ScheduledTask scheduledTask,
    * 2) Prepares Transforms if needed.
    */
   private void initialize() {
-    // Initialize data read of SourceVertex.
-    taskDag.getTopologicalSort().stream()
-        .filter(task -> task instanceof BoundedSourceTask)
-        .forEach(boundedSourceTask -> ((BoundedSourceTask) boundedSourceTask).setReadable(
-            logicalTaskIdToReadable.get(boundedSourceTask.getId())));
-
-    // Initialize data handlers for each Task.
-    taskDag.topologicalDo(task -> taskDataHandlers.add(new TaskDataHandler(task)));
+    // Initialize data handlers for each IRVertex.
+    irVertexDag.topologicalDo(irVertex -> irVertexDataHandlers.add(new IRVertexDataHandler(irVertex)));
 
     // Initialize data transfer.
-    // Construct a pointer-based DAG of TaskDataHandlers that are used for data transfer.
+    // Construct a pointer-based DAG of irVertexDataHandlers that are used for data transfer.
     // 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
     // to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
-    taskDag.topologicalDo(task -> {
-      final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(task);
-      final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(task);
-      final TaskDataHandler dataHandler = getTaskDataHandler(task);
-
-      // Set data handlers of children tasks.
-      // This forms a pointer-based DAG of TaskDataHandlers.
-      final List<TaskDataHandler> childrenDataHandlers = new ArrayList<>();
-      taskDag.getChildren(task.getId()).forEach(child ->
-          childrenDataHandlers.add(getTaskDataHandler(child)));
+    irVertexDag.topologicalDo(irVertex -> {
+      final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
+      final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
+      final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
+
+      // Set data handlers of children irVertices.
+      // This forms a pointer-based DAG of irVertexDataHandlers.
+      final List<IRVertexDataHandler> childrenDataHandlers = new ArrayList<>();
+      irVertexDag.getChildren(irVertex.getId()).forEach(child ->
+          childrenDataHandlers.add(getIRVertexDataHandler(child)));
       dataHandler.setChildrenDataHandler(childrenDataHandlers);
 
       // Add InputReaders for inter-stage data transfer
@@ -158,7 +160,6 @@ private void initialize() {
         if (inputReader.isSideInputReader()) {
           dataHandler.addSideInputFromOtherStages(inputReader);
         } else {
-          inputReaders.add(inputReader);
           inputReaderToDataHandlersMap.putIfAbsent(inputReader, new ArrayList<>());
           inputReaderToDataHandlersMap.get(inputReader).add(dataHandler);
         }
@@ -167,29 +168,29 @@ private void initialize() {
       // Add OutputWriters for inter-stage data transfer
       outEdgesToOtherStages.forEach(physicalStageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
-            task, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+            irVertex, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
         dataHandler.addOutputWriter(outputWriter);
       });
 
       // Add InputPipes for intra-stage data transfer
-      addInputFromThisStage(task, dataHandler);
+      addInputFromThisStage(irVertex, dataHandler);
 
       // Add OutputPipe for intra-stage data transfer
-      setOutputCollector(task, dataHandler);
+      setOutputCollector(irVertex, dataHandler);
     });
 
     // Prepare Transforms if needed.
-    taskDag.topologicalDo(task -> {
-      if (task instanceof OperatorTask) {
-        final Transform transform = ((OperatorTask) task).getTransform();
+    irVertexDag.topologicalDo(irVertex -> {
+      if (irVertex instanceof OperatorVertex) {
+        final Transform transform = ((OperatorVertex) irVertex).getTransform();
         final Map<Transform, Object> sideInputMap = new HashMap<>();
-        final TaskDataHandler dataHandler = getTaskDataHandler(task);
+        final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
         // Check and collect side inputs.
         if (!dataHandler.getSideInputFromOtherStages().isEmpty()) {
-          sideInputFromOtherStages(task, sideInputMap);
+          sideInputFromOtherStages(irVertex, sideInputMap);
         }
         if (!dataHandler.getSideInputFromThisStage().isEmpty()) {
-          sideInputFromThisStage(task, sideInputMap);
+          sideInputFromThisStage(irVertex, sideInputMap);
         }
 
         final Transform.Context transformContext = new ContextImpl(sideInputMap);
@@ -200,43 +201,41 @@ private void initialize() {
   }
 
   /**
-   * Collect all inter-stage incoming edges of this task.
+   * Collect all inter-stage incoming edges of this vertex.
    *
-   * @param task the Task whose inter-stage incoming edges to be collected.
+   * @param irVertex the IRVertex whose inter-stage incoming edges to be collected.
    * @return the collected incoming edges.
    */
-  private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final Task task) {
+  private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
     return stageIncomingEdges.stream().filter(
-        stageInEdge -> stageInEdge.getDstVertex().getId().equals(task.getIrVertexId()))
+        stageInEdge -> stageInEdge.getDstVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
   }
 
   /**
-   * Collect all inter-stage outgoing edges of this task.
+   * Collect all inter-stage outgoing edges of this vertex.
    *
-   * @param task the Task whose inter-stage outgoing edges to be collected.
+   * @param irVertex the IRVertex whose inter-stage outgoing edges to be collected.
    * @return the collected outgoing edges.
    */
-  private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final Task task) {
+  private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
     return stageOutgoingEdges.stream().filter(
-        stageInEdge -> stageInEdge.getSrcVertex().getId().equals(task.getIrVertexId()))
+        stageInEdge -> stageInEdge.getSrcVertex().getId().equals(irVertex.getId()))
         .collect(Collectors.toSet());
   }
 
   /**
-   * Add input OutputCollectors to each {@link Task}.
-   * Input OutputCollector denotes all the OutputCollectors of intra-Stage parent tasks of this task.
+   * Add input OutputCollectors to each {@link IRVertex}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage dependencies.
    *
-   * @param task the Task to add input OutputCollectors to.
+   * @param irVertex the IRVertex to add input OutputCollectors to.
    */
-  private void addInputFromThisStage(final Task task, final TaskDataHandler dataHandler) {
-    List<Task> parentTasks = taskDag.getParents(task.getId());
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-
-    if (parentTasks != null) {
-      parentTasks.forEach(parent -> {
-        final OutputCollectorImpl parentOutputCollector = getTaskDataHandler(parent).getOutputCollector();
-        if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  private void addInputFromThisStage(final IRVertex irVertex, final IRVertexDataHandler dataHandler) {
+    List<IRVertex> parentVertices = irVertexDag.getParents(irVertex.getId());
+    if (parentVertices != null) {
+      parentVertices.forEach(parent -> {
+        final OutputCollectorImpl parentOutputCollector = getIRVertexDataHandler(parent).getOutputCollector();
+        if (parentOutputCollector.hasSideInputFor(irVertex.getId())) {
           dataHandler.addSideInputFromThisStage(parentOutputCollector);
         } else {
           dataHandler.addInputFromThisStages(parentOutputCollector);
@@ -246,21 +245,15 @@ private void addInputFromThisStage(final Task task, final TaskDataHandler dataHa
   }
 
   /**
-   * Add output outputCollectors to each {@link Task}.
-   * Output outputCollector denotes the one and only one outputCollector of this task.
-   * Check the outgoing edges that will use this outputCollector,
-   * and set this outputCollector as side input if any one of the edges uses this outputCollector as side input.
-   *
-   * @param task the Task to add output outputCollectors to.
+   * Add outputCollectors to each {@link IRVertex}.
+   * @param irVertex the IRVertex to add output outputCollectors to.
    */
-  private void setOutputCollector(final Task task, final TaskDataHandler dataHandler) {
+  private void setOutputCollector(final IRVertex irVertex, final IRVertexDataHandler dataHandler) {
     final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-
-    taskDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
+    irVertexDag.getOutgoingEdgesOf(irVertex).forEach(outEdge -> {
       if (outEdge.isSideInput()) {
         outputCollector.setSideInputRuntimeEdge(outEdge);
-        outputCollector.setAsSideInputFor(physicalTaskId);
+        outputCollector.setAsSideInputFor(irVertex.getId());
       }
     });
 
@@ -268,25 +261,17 @@ private void setOutputCollector(final Task task, final TaskDataHandler dataHandl
   }
 
   /**
-   * Check that this task has OutputWriter for inter-stage data.
+   * Check that this irVertex has OutputWriter for inter-stage data.
    *
-   * @param task the task to check whether it has OutputWriters.
-   * @return true if the task has OutputWriters.
+   * @param irVertex the irVertex to check whether it has OutputWriters.
+   * @return true if the irVertex has OutputWriters.
    */
-  private boolean hasOutputWriter(final Task task) {
-    return !getTaskDataHandler(task).getOutputWriters().isEmpty();
+  private boolean hasOutputWriter(final IRVertex irVertex) {
+    return !getIRVertexDataHandler(irVertex).getOutputWriters().isEmpty();
   }
 
-  /**
-   * If the given task is MetricCollectionBarrierTask,
-   * set task as put on hold and use it to decide Task state when Task finishes.
-   *
-   * @param task the task to check whether it has OutputWriters.
-   * @return true if the task has OutputWriters.
-   */
-  private void setTaskPutOnHold(final MetricCollectionBarrierTask task) {
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-    logicalTaskIdPutOnHold = RuntimeIdGenerator.getLogicalTaskIdIdFromPhysicalTaskId(physicalTaskId);
+  private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex) {
+    irVertexIdPutOnHold = irVertex.getId();
   }
 
   /**
@@ -294,16 +279,15 @@ private void setTaskPutOnHold(final MetricCollectionBarrierTask task) {
    * As element-wise output write is done and the block is in memory,
    * flush the block into the designated data store and commit it.
    *
-   * @param task the task with OutputWriter to flush and commit output block.
+   * @param irVertex the IRVertex with OutputWriter to flush and commit output block.
    */
-  private void writeAndCloseOutputWriters(final Task task) {
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
+  private void writeAndCloseOutputWriters(final IRVertex irVertex) {
     final List<Long> writtenBytesList = new ArrayList<>();
     final Map<String, Object> metric = new HashMap<>();
-    metricCollector.beginMeasurement(physicalTaskId, metric);
+    metricCollector.beginMeasurement(irVertex.getId(), metric);
     final long writeStartTime = System.currentTimeMillis();
 
-    getTaskDataHandler(task).getOutputWriters().forEach(outputWriter -> {
+    getIRVertexDataHandler(irVertex).getOutputWriters().forEach(outputWriter -> {
       outputWriter.close();
       final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
       writtenBytes.ifPresent(writtenBytesList::add);
@@ -312,21 +296,25 @@ private void writeAndCloseOutputWriters(final Task task) {
     final long writeEndTime = System.currentTimeMillis();
     metric.put("OutputWriteTime(ms)", writeEndTime - writeStartTime);
     putWrittenBytesMetric(writtenBytesList, metric);
-    metricCollector.endMeasurement(physicalTaskId, metric);
+    metricCollector.endMeasurement(irVertex.getId(), metric);
   }
 
   /**
    * Get input iterator from BoundedSource and bind it with id.
    */
   private void prepareInputFromSource() {
-    taskDag.topologicalDo(task -> {
-      if (task instanceof BoundedSourceTask) {
+    irVertexDag.topologicalDo(irVertex -> {
+      if (irVertex instanceof SourceVertex) {
         try {
           final String iteratorId = generateIteratorId();
-          final Iterator iterator = ((BoundedSourceTask) task).getReadable().read().iterator();
+          final Readable readable = irVertexIdToReadable.get(irVertex.getId());
+          if (readable == null) {
+            throw new RuntimeException(irVertex.toString());
+          }
+          final Iterator iterator = readable.read().iterator();
           idToSrcIteratorMap.putIfAbsent(iteratorId, iterator);
           srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new ArrayList<>());
-          srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
+          srcIteratorIdToDataHandlersMap.get(iteratorId).add(getIRVertexDataHandler(irVertex));
         } catch (final BlockFetchException ex) {
           taskStateManager.onTaskStateChanged(TaskState.State.FAILED_RECOVERABLE,
               Optional.empty(), Optional.of(TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
@@ -374,22 +362,21 @@ private void prepareInputFromOtherStages() {
   }
 
   /**
-   * Check whether all tasks in this Task are finished.
+   * Check whether all vertices in this Task are finished.
    *
-   * @return true if all tasks are finished.
+   * @return true if all vertices are finished.
    */
-  private boolean finishedAllTasks() {
+  private boolean finishedAllVertices() {
     // Total number of Tasks
-    int taskNum = taskDataHandlers.size();
-    int finishedTaskNum = finishedTaskIds.size();
-
-    return finishedTaskNum == taskNum;
+    int vertexNum = irVertexDataHandlers.size();
+    int finishedVertexNum = finishedVertexIds.size();
+    return finishedVertexNum == vertexNum;
   }
 
   /**
-   * Initialize the very first map of OutputCollector-children task DAG.
+   * Initialize the very first map of OutputCollector-children irVertex DAG.
    * In each map entry, the OutputCollector contains input data to be propagated through
-   * the children task DAG.
+   * the children irVertex DAG.
    */
   private void initializeOutputToChildrenDataHandlersMap() {
     srcIteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
@@ -403,11 +390,11 @@ private void initializeOutputToChildrenDataHandlersMap() {
   }
 
   /**
-   * Update the map of OutputCollector-children task DAG.
+   * Update the map of OutputCollector-children irVertex DAG.
    */
   private void updateOutputToChildrenDataHandlersMap() {
-    Map<OutputCollectorImpl, List<TaskDataHandler>> currentMap = outputToChildrenDataHandlersMap;
-    Map<OutputCollectorImpl, List<TaskDataHandler>> updatedMap = new HashMap<>();
+    Map<OutputCollectorImpl, List<IRVertexDataHandler>> currentMap = outputToChildrenDataHandlersMap;
+    Map<OutputCollectorImpl, List<IRVertexDataHandler>> updatedMap = new HashMap<>();
 
     currentMap.values().forEach(dataHandlers ->
         dataHandlers.forEach(dataHandler -> {
@@ -419,13 +406,13 @@ private void updateOutputToChildrenDataHandlersMap() {
   }
 
   /**
-   * Update the map of OutputCollector-children task DAG.
+   * Update the map of OutputCollector-children irVertex DAG.
    *
-   * @param task the Task with the transform to close.
+   * @param irVertex the IRVertex with the transform to close.
    */
-  private void closeTransform(final Task task) {
-    if (task instanceof OperatorTask) {
-      Transform transform = ((OperatorTask) task).getTransform();
+  private void closeTransform(final IRVertex irVertex) {
+    if (irVertex instanceof OperatorVertex) {
+      Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.close();
     }
   }
@@ -434,11 +421,11 @@ private void closeTransform(final Task task) {
    * As a preprocessing of side input data, get inter stage side input
    * and form a map of source transform-side input.
    *
-   * @param task the task which receives side input from other stages.
+   * @param irVertex the IRVertex which receives side input from other stages.
    * @param sideInputMap the map of source transform-side input to build.
    */
-  private void sideInputFromOtherStages(final Task task, final Map<Transform, Object> sideInputMap) {
-    getTaskDataHandler(task).getSideInputFromOtherStages().forEach(sideInputReader -> {
+  private void sideInputFromOtherStages(final IRVertex irVertex, final Map<Transform, Object> sideInputMap) {
+    getIRVertexDataHandler(irVertex).getSideInputFromOtherStages().forEach(sideInputReader -> {
       try {
         final DataUtil.IteratorWithNumBytes sideInputIterator = sideInputReader.read().get(0).get();
         final Object sideInput = getSideInput(sideInputIterator);
@@ -447,7 +434,7 @@ private void sideInputFromOtherStages(final Task task, final Map<Transform, Obje
         if (inEdge instanceof PhysicalStageEdge) {
           srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
         } else {
-          srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
+          srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
         }
         sideInputMap.put(srcTransform, sideInput);
 
@@ -477,11 +464,11 @@ private void sideInputFromOtherStages(final Task task, final Map<Transform, Obje
    * Assumption:  intra stage side input denotes a data element initially received
    *              via side input reader from other stages.
    *
-   * @param task the task which receives the data element marked as side input.
+   * @param irVertex the IRVertex which receives the data element marked as side input.
    * @param sideInputMap the map of source transform-side input to build.
    */
-  private void sideInputFromThisStage(final Task task, final Map<Transform, Object> sideInputMap) {
-    getTaskDataHandler(task).getSideInputFromThisStage().forEach(input -> {
+  private void sideInputFromThisStage(final IRVertex irVertex, final Map<Transform, Object> sideInputMap) {
+    getIRVertexDataHandler(irVertex).getSideInputFromThisStage().forEach(input -> {
       // because sideInput is only 1 element in the outputCollector
       Object sideInput = input.remove();
       final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
@@ -489,7 +476,7 @@ private void sideInputFromThisStage(final Task task, final Map<Transform, Object
       if (inEdge instanceof PhysicalStageEdge) {
         srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
       } else {
-        srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
+        srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
       }
       sideInputMap.put(srcTransform, sideInput);
     });
@@ -505,11 +492,10 @@ public void execute() {
     long boundedSrcReadEndTime = 0;
     long inputReadStartTime = 0;
     long inputReadEndTime = 0;
-    if (isExecutionRequested) {
+    if (isExecuted) {
       throw new RuntimeException("Task {" + taskId + "} execution called again!");
-    } else {
-      isExecutionRequested = true;
     }
+    isExecuted = true;
     taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
     LOG.info("{} Executing!", taskId);
 
@@ -523,12 +509,12 @@ public void execute() {
     inputReadStartTime = System.currentTimeMillis();
     prepareInputFromOtherStages();
 
-    // Execute the Task DAG.
+    // Execute the IRVertex DAG.
     try {
       srcIteratorIdToDataHandlersMap.forEach((srcIteratorId, dataHandlers) -> {
         Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
         iterator.forEachRemaining(element -> {
-          for (final TaskDataHandler dataHandler : dataHandlers) {
+          for (final IRVertexDataHandler dataHandler : dataHandlers) {
             runTask(dataHandler, element);
           }
         });
@@ -539,9 +525,9 @@ public void execute() {
         Pair<String, DataUtil.IteratorWithNumBytes> idToIteratorPair = partitionQueue.take();
         final String iteratorId = idToIteratorPair.left();
         final DataUtil.IteratorWithNumBytes iterator = idToIteratorPair.right();
-        List<TaskDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
+        List<IRVertexDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
         iterator.forEachRemaining(element -> {
-          for (final TaskDataHandler dataHandler : dataHandlers) {
+          for (final IRVertexDataHandler dataHandler : dataHandlers) {
             runTask(dataHandler, element);
           }
         });
@@ -566,44 +552,44 @@ public void execute() {
       metric.put("InputReadTime(ms)", inputReadEndTime - inputReadStartTime);
 
       // Process intra-Task data.
-      // Intra-Task data comes from outputCollectors of this Task's Tasks.
+      // Intra-Task data comes from outputCollectors of this Task's vertices.
       initializeOutputToChildrenDataHandlersMap();
-      while (!finishedAllTasks()) {
+      while (!finishedAllVertices()) {
         outputToChildrenDataHandlersMap.forEach((outputCollector, childrenDataHandlers) -> {
-          // Get the task that has this outputCollector as its output outputCollector
-          Task outputCollectorOwnerTask = taskDataHandlers.stream()
+          // Get the vertex that has this outputCollector as its output outputCollector
+          final IRVertex outputProducer = irVertexDataHandlers.stream()
               .filter(dataHandler -> dataHandler.getOutputCollector() == outputCollector)
-              .findFirst().get().getTask();
+              .findFirst().get().getIRVertex();
 
-          // Before consuming the output of outputCollectorOwnerTask as input,
+          // Before consuming the output of outputProducer as input,
           // close transform if it is OperatorTransform.
-          closeTransform(outputCollectorOwnerTask);
+          closeTransform(outputProducer);
 
-          // Set outputCollectorOwnerTask as finished.
-          finishedTaskIds.add(getPhysicalTaskId(outputCollectorOwnerTask.getId()));
+          // Set outputProducer as finished.
+          finishedVertexIds.add(outputProducer.getId());
 
           while (!outputCollector.isEmpty()) {
             final Object element = outputCollector.remove();
 
-            // Pass outputCollectorOwnerTask's output to its children tasks recursively.
+            // Pass outputProducer's output to its children tasks recursively.
             if (!childrenDataHandlers.isEmpty()) {
-              for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+              for (final IRVertexDataHandler childDataHandler : childrenDataHandlers) {
                 runTask(childDataHandler, element);
               }
             }
 
             // Write element-wise to OutputWriters if any and close the OutputWriters.
-            if (hasOutputWriter(outputCollectorOwnerTask)) {
+            if (hasOutputWriter(outputProducer)) {
               // If outputCollector isn't empty(if closeTransform produced some output),
               // write them element-wise to OutputWriters.
               List<OutputWriter> outputWritersOfTask =
-                  getTaskDataHandler(outputCollectorOwnerTask).getOutputWriters();
+                  getIRVertexDataHandler(outputProducer).getOutputWriters();
               outputWritersOfTask.forEach(outputWriter -> outputWriter.write(element));
             }
           }
 
-          if (hasOutputWriter(outputCollectorOwnerTask)) {
-            writeAndCloseOutputWriters(outputCollectorOwnerTask);
+          if (hasOutputWriter(outputProducer)) {
+            writeAndCloseOutputWriters(outputProducer);
           }
         });
         updateOutputToChildrenDataHandlersMap();
@@ -625,47 +611,47 @@ public void execute() {
     final boolean available = serBlockSize >= 0;
     putReadBytesMetric(available, serBlockSize, encodedBlockSize, metric);
     metricCollector.endMeasurement(taskId, metric);
-    if (logicalTaskIdPutOnHold == null) {
+    if (irVertexIdPutOnHold == null) {
       taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
     } else {
       taskStateManager.onTaskStateChanged(TaskState.State.ON_HOLD,
-          Optional.of(logicalTaskIdPutOnHold),
+          Optional.of(irVertexIdPutOnHold),
           Optional.empty());
     }
     LOG.info("{} Complete!", taskId);
   }
 
   /**
-   * Recursively executes a task with the input data element.
+   * Recursively executes a vertex with the input data element.
    *
-   * @param dataHandler TaskDataHandler of a task to execute.
+   * @param dataHandler IRVertexDataHandler of a vertex to execute.
    * @param dataElement input data element to process.
    */
-  private void runTask(final TaskDataHandler dataHandler, final Object dataElement) {
-    final Task task = dataHandler.getTask();
+  private void runTask(final IRVertexDataHandler dataHandler, final Object dataElement) {
+    final IRVertex irVertex = dataHandler.getIRVertex();
     final OutputCollectorImpl outputCollector = dataHandler.getOutputCollector();
 
-    // Process element-wise depending on the Task type
-    if (task instanceof BoundedSourceTask) {
+    // Process element-wise depending on the vertex type
+    if (irVertex instanceof SourceVertex) {
       if (dataElement == null) { // null used for Beam VoidCoders
         final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
         outputCollector.emit(nullForVoidCoder);
       } else {
         outputCollector.emit(dataElement);
       }
-    } else if (task instanceof OperatorTask) {
-      final Transform transform = ((OperatorTask) task).getTransform();
+    } else if (irVertex instanceof OperatorVertex) {
+      final Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.onData(dataElement);
-    } else if (task instanceof MetricCollectionBarrierTask) {
+    } else if (irVertex instanceof MetricCollectionBarrierVertex) {
       if (dataElement == null) { // null used for Beam VoidCoders
         final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
         outputCollector.emit(nullForVoidCoder);
       } else {
         outputCollector.emit(dataElement);
       }
-      setTaskPutOnHold((MetricCollectionBarrierTask) task);
+      setIRVertexPutOnHold((MetricCollectionBarrierVertex) irVertex);
     } else {
-      throw new UnsupportedOperationException("This type  of Task is not supported");
+      throw new UnsupportedOperationException("This type of IRVertex is not supported");
     }
 
     // For the produced output
@@ -673,31 +659,21 @@ private void runTask(final TaskDataHandler dataHandler, final Object dataElement
       final Object element = outputCollector.remove();
 
       // Pass output to its children recursively.
-      List<TaskDataHandler> childrenDataHandlers = dataHandler.getChildren();
+      List<IRVertexDataHandler> childrenDataHandlers = dataHandler.getChildren();
       if (!childrenDataHandlers.isEmpty()) {
-        for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+        for (final IRVertexDataHandler childDataHandler : childrenDataHandlers) {
           runTask(childDataHandler, element);
         }
       }
 
       // Write element-wise to OutputWriters if any
-      if (hasOutputWriter(task)) {
+      if (hasOutputWriter(irVertex)) {
         List<OutputWriter> outputWritersOfTask = dataHandler.getOutputWriters();
         outputWritersOfTask.forEach(outputWriter -> outputWriter.write(element));
       }
     }
   }
 
-  /**
-   * Get the matching physical task id of the given logical task id.
-   *
-   * @param logicalTaskId the logical task id.
-   * @return the physical task id.
-   */
-  private String getPhysicalTaskId(final String logicalTaskId) {
-    return RuntimeIdGenerator.generatePhysicalTaskId(taskIdx, logicalTaskId);
-  }
-
   /**
    * Generate a unique iterator id.
    *
@@ -707,9 +683,9 @@ private String generateIteratorId() {
     return ITERATORID_PREFIX + ITERATORID_GENERATOR.getAndIncrement();
   }
 
-  private TaskDataHandler getTaskDataHandler(final Task task) {
-    return taskDataHandlers.stream()
-        .filter(dataHandler -> dataHandler.getTask() == task)
+  private IRVertexDataHandler getIRVertexDataHandler(final IRVertex irVertex) {
+    return irVertexDataHandlers.stream()
+        .filter(dataHandler -> dataHandler.getIRVertex() == irVertex)
         .findFirst().get();
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 77d5136a..e2a31d6f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -15,16 +15,13 @@
  */
 package edu.snu.nemo.runtime.executor;
 
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 
 import java.util.*;
 
@@ -47,13 +44,12 @@
   private final MetricCollector metricCollector;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
-  public TaskStateManager(final ScheduledTask scheduledTask,
-                               final DAG<Task, RuntimeEdge<Task>> taskDag,
-                               final String executorId,
-                               final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
-                               final MetricMessageSender metricMessageSender) {
-    this.taskId = scheduledTask.getTaskId();
-    this.attemptIdx = scheduledTask.getAttemptIdx();
+  public TaskStateManager(final ExecutableTask executableTask,
+                          final String executorId,
+                          final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
+                          final MetricMessageSender metricMessageSender) {
+    this.taskId = executableTask.getTaskId();
+    this.attemptIdx = executableTask.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.metricCollector = new MetricCollector(metricMessageSender);
@@ -62,11 +58,11 @@ public TaskStateManager(final ScheduledTask scheduledTask,
   /**
    * Updates the state of the task.
    * @param newState of the task.
-   * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
+   * @param vertexPutOnHold the vertex put on hold.
    * @param cause only provided as non-empty upon recoverable failures.
    */
   public synchronized void onTaskStateChanged(final TaskState.State newState,
-                                              final Optional<String> taskPutOnHold,
+                                              final Optional<String> vertexPutOnHold,
                                               final Optional<TaskState.RecoverableFailureCause> cause) {
     final Map<String, Object> metric = new HashMap<>();
 
@@ -98,7 +94,7 @@ public synchronized void onTaskStateChanged(final TaskState.State newState,
         break;
       case ON_HOLD:
         LOG.debug("Task ID {} put on hold.", this.taskId);
-        notifyTaskStateToMaster(newState, taskPutOnHold, cause);
+        notifyTaskStateToMaster(newState, vertexPutOnHold, cause);
         break;
       default:
         throw new IllegalStateException("Illegal state at this point");
@@ -108,20 +104,20 @@ public synchronized void onTaskStateChanged(final TaskState.State newState,
   /**
    * Notifies the change in task state to master.
    * @param newState of the task.
-   * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
+   * @param vertexPutOnHold the vertex put on hold.
    * @param cause only provided as non-empty upon recoverable failures.
    */
   private void notifyTaskStateToMaster(final TaskState.State newState,
-                                            final Optional<String> taskPutOnHold,
-                                            final Optional<TaskState.RecoverableFailureCause> cause) {
+                                       final Optional<String> vertexPutOnHold,
+                                       final Optional<TaskState.RecoverableFailureCause> cause) {
     final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
         ControlMessage.TaskStateChangedMsg.newBuilder()
-          .setExecutorId(executorId)
-          .setTaskId(taskId)
-          .setAttemptIdx(attemptIdx)
-          .setState(convertState(newState));
-    if (taskPutOnHold.isPresent()) {
-          msgBuilder.setTaskPutOnHoldId(taskPutOnHold.get());
+            .setExecutorId(executorId)
+            .setTaskId(taskId)
+            .setAttemptIdx(attemptIdx)
+            .setState(convertState(newState));
+    if (vertexPutOnHold.isPresent()) {
+      msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
     }
     if (cause.isPresent()) {
       msgBuilder.setFailureCause(convertFailureCause(cause.get()));
@@ -139,33 +135,33 @@ private void notifyTaskStateToMaster(final TaskState.State newState,
 
   private ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
     switch (state) {
-    case READY:
-      return ControlMessage.TaskStateFromExecutor.READY;
-    case EXECUTING:
-      return ControlMessage.TaskStateFromExecutor.EXECUTING;
-    case COMPLETE:
-      return ControlMessage.TaskStateFromExecutor.COMPLETE;
-    case FAILED_RECOVERABLE:
-      return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
-    case FAILED_UNRECOVERABLE:
-      return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
-    case ON_HOLD:
-      return ControlMessage.TaskStateFromExecutor.ON_HOLD;
-    default:
-      throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+      case READY:
+        return ControlMessage.TaskStateFromExecutor.READY;
+      case EXECUTING:
+        return ControlMessage.TaskStateFromExecutor.EXECUTING;
+      case COMPLETE:
+        return ControlMessage.TaskStateFromExecutor.COMPLETE;
+      case FAILED_RECOVERABLE:
+        return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
+      case FAILED_UNRECOVERABLE:
+        return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
+      case ON_HOLD:
+        return ControlMessage.TaskStateFromExecutor.ON_HOLD;
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
     }
   }
 
   private ControlMessage.RecoverableFailureCause convertFailureCause(
-    final TaskState.RecoverableFailureCause cause) {
+      final TaskState.RecoverableFailureCause cause) {
     switch (cause) {
-    case INPUT_READ_FAILURE:
-      return ControlMessage.RecoverableFailureCause.InputReadFailure;
-    case OUTPUT_WRITE_FAILURE:
-      return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
-    default:
-      throw new UnknownFailureCauseException(
-          new Throwable("The failure cause for the recoverable failure is unknown"));
+      case INPUT_READ_FAILURE:
+        return ControlMessage.RecoverableFailureCause.InputReadFailure;
+      case OUTPUT_WRITE_FAILURE:
+        return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
+      default:
+        throw new UnknownFailureCauseException(
+            new Throwable("The failure cause for the recoverable failure is unknown"));
     }
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index d7b13f29..93855455 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -159,6 +159,7 @@ public Block createBlock(final String blockId,
             numSerializedBytes += partition.getNumSerializedBytes();
             numEncodedBytes += partition.getNumEncodedBytes();
           }
+
           return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
               numEncodedBytes));
         } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
@@ -207,13 +208,16 @@ public Block createBlock(final String blockId,
                       .build());
           return responseFromMasterFuture;
         });
-    blockLocationFuture.whenComplete((message, throwable) -> pendingBlockLocationRequest.remove(blockId));
+    blockLocationFuture.whenComplete((message, throwable) -> {
+      pendingBlockLocationRequest.remove(blockId);
+    });
 
     // Using thenCompose so that fetching block data starts after getting response from master.
     return blockLocationFuture.thenCompose(responseFromMaster -> {
       if (responseFromMaster.getType() != ControlMessage.MessageType.BlockLocationInfo) {
         throw new RuntimeException("Response message type mismatch!");
       }
+
       final ControlMessage.BlockLocationInfoMsg blockLocationInfoMsg =
           responseFromMaster.getBlockLocationInfoMsg();
       if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 13531c2f..7a4068eb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -18,7 +18,6 @@
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.reef.tang.annotations.Parameter;
 
@@ -42,18 +41,18 @@ public DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final i
   /**
    * Creates an {@link OutputWriter} between two stages.
    *
-   * @param srcTask     the {@link Task} that outputs the data to be written.
+   * @param srcIRVertex     the {@link IRVertex} that outputs the data to be written.
    * @param srcTaskIdx  the index of the source task.
    * @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
    * @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
    * @return the {@link OutputWriter} created.
    */
-  public OutputWriter createWriter(final Task srcTask,
+  public OutputWriter createWriter(final IRVertex srcIRVertex,
                                    final int srcTaskIdx,
                                    final IRVertex dstIRVertex,
                                    final RuntimeEdge<?> runtimeEdge) {
     return new OutputWriter(hashRangeMultiplier, srcTaskIdx,
-        srcTask.getIrVertexId(), dstIRVertex, runtimeEdge, blockManagerWorker);
+        srcIRVertex.getId(), dstIRVertex, runtimeEdge, blockManagerWorker);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
similarity index 72%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
index 56f5e5b7..84b7a8e6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,9 +26,9 @@
  * As Task input is processed element-wise, Task output element percolates down
  * through the DAG of children TaskDataHandlers.
  */
-public final class TaskDataHandler {
-  private final Task task;
-  private List<TaskDataHandler> children;
+public final class IRVertexDataHandler {
+  private final IRVertex irVertex;
+  private List<IRVertexDataHandler> children;
   private final List<OutputCollectorImpl> inputFromThisStage;
   private final List<InputReader> sideInputFromOtherStages;
   private final List<OutputCollectorImpl> sideInputFromThisStage;
@@ -36,12 +36,12 @@
   private final List<OutputWriter> outputWriters;
 
   /**
-   * TaskDataHandler Constructor.
+   * IRVertexDataHandler Constructor.
    *
-   * @param task Task of this TaskDataHandler.
+   * @param irVertex Task of this IRVertexDataHandler.
    */
-  public TaskDataHandler(final Task task) {
-    this.task = task;
+  public IRVertexDataHandler(final IRVertex irVertex) {
+    this.irVertex = irVertex;
     this.children = new ArrayList<>();
     this.inputFromThisStage = new ArrayList<>();
     this.sideInputFromOtherStages = new ArrayList<>();
@@ -51,12 +51,12 @@ public TaskDataHandler(final Task task) {
   }
 
   /**
-   * Get the task that owns this TaskDataHandler.
+   * Get the irVertex that owns this IRVertexDataHandler.
    *
-   * @return task of this TaskDataHandler.
+   * @return irVertex of this IRVertexDataHandler.
    */
-  public Task getTask() {
-    return task;
+  public IRVertex getIRVertex() {
+    return irVertex;
   }
 
   /**
@@ -64,7 +64,7 @@ public Task getTask() {
    *
    * @return DAG of children tasks' TaskDataHandlers.
    */
-  public List<TaskDataHandler> getChildren() {
+  public List<IRVertexDataHandler> getChildren() {
     return children;
   }
 
@@ -89,18 +89,18 @@ public Task getTask() {
   }
 
   /**
-   * Get OutputCollector of this task.
+   * Get OutputCollector of this irVertex.
    *
-   * @return OutputCollector of this task.
+   * @return OutputCollector of this irVertex.
    */
   public OutputCollectorImpl getOutputCollector() {
     return outputCollector;
   }
 
   /**
-   * Get OutputWriters of this task.
+   * Get OutputWriters of this irVertex.
    *
-   * @return OutputWriters of this task.
+   * @return OutputWriters of this irVertex.
    */
   public List<OutputWriter> getOutputWriters() {
     return outputWriters;
@@ -111,14 +111,14 @@ public OutputCollectorImpl getOutputCollector() {
    *
    * @param childrenDataHandler list of children TaskDataHandlers.
    */
-  public void setChildrenDataHandler(final List<TaskDataHandler> childrenDataHandler) {
+  public void setChildrenDataHandler(final List<IRVertexDataHandler> childrenDataHandler) {
     children = childrenDataHandler;
   }
 
   /**
-   * Add OutputCollector of a parent task that will provide intra-stage input.
+   * Add OutputCollector of a parent irVertex that will provide intra-stage input.
    *
-   * @param input OutputCollector of a parent task.
+   * @param input OutputCollector of a parent irVertex.
    */
   public void addInputFromThisStages(final OutputCollectorImpl input) {
     inputFromThisStage.add(input);
@@ -134,27 +134,27 @@ public void addSideInputFromOtherStages(final InputReader sideInputReader) {
   }
 
   /**
-   * Add OutputCollector of a parent task that will provide intra-stage side input.
+   * Add OutputCollector of a parent irVertex that will provide intra-stage side input.
    *
-   * @param ocAsSideInput OutputCollector of a parent task with side input.
+   * @param ocAsSideInput OutputCollector of a parent irVertex with side input.
    */
   public void addSideInputFromThisStage(final OutputCollectorImpl ocAsSideInput) {
     sideInputFromThisStage.add(ocAsSideInput);
   }
 
   /**
-   * Set OutputCollector of this task.
+   * Set OutputCollector of this irVertex.
    *
-   * @param oc OutputCollector of this task.
+   * @param oc OutputCollector of this irVertex.
    */
   public void setOutputCollector(final OutputCollectorImpl oc) {
     outputCollector = oc;
   }
 
   /**
-   * Add OutputWriter of this task.
+   * Add OutputWriter of this irVertex.
    *
-   * @param outputWriter OutputWriter of this task.
+   * @param outputWriter OutputWriter of this irVertex.
    */
   public void addOutputWriter(final OutputWriter outputWriter) {
     outputWriters.add(outputWriter);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 80ab1ea9..49401aa3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -20,6 +20,7 @@
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -162,9 +163,9 @@ private void initializePartitionStates(final BlockManagerMaster blockManagerMast
 
       // Initialize states for blocks of stage internal edges
       taskIdsForStage.forEach(taskId -> {
-        final DAG<Task, RuntimeEdge<Task>> taskInternalDag = physicalStage.getTaskDag();
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = physicalStage.getIRDAG();
         taskInternalDag.getVertices().forEach(task -> {
-          final List<RuntimeEdge<Task>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
+          final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
           internalOutgoingEdges.forEach(taskRuntimeEdge -> {
             final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
             final String blockId = RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 8d224244..51c505d8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -119,7 +119,7 @@ public RuntimeMaster(final Scheduler scheduler,
   public Pair<JobStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
                                                                  final int maxScheduleAttempt) {
     final Callable<Pair<JobStateManager, ScheduledExecutorService>> jobExecutionCallable = () -> {
-      this.irVertices.addAll(plan.getTaskIRVertexMap().values());
+      this.irVertices.addAll(plan.getIdToIRVertex().values());
       try {
         final JobStateManager jobStateManager =
             new JobStateManager(plan, blockManagerMaster, metricMessageHandler, maxScheduleAttempt);
@@ -268,7 +268,7 @@ private void handleControlMessage(final ControlMessage.Message message) {
             taskStateChangedMsg.getTaskId(),
             taskStateChangedMsg.getAttemptIdx(),
             convertTaskState(taskStateChangedMsg.getState()),
-            taskStateChangedMsg.getTaskPutOnHoldId(),
+            taskStateChangedMsg.getVertexPutOnHoldId(),
             convertFailureCause(taskStateChangedMsg.getFailureCause()));
         break;
       case ExecutorFailed:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index caf16061..8536746e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
@@ -95,17 +95,17 @@ public ExecutorRepresenter(final String executorId,
 
   /**
    * Marks the Task as running, and sends scheduling message to the executor.
-   * @param scheduledTask
+   * @param executableTask
    */
-  public void onTaskScheduled(final ScheduledTask scheduledTask) {
-    runningTasks.add(scheduledTask.getTaskId());
-    runningTaskToAttempt.put(scheduledTask.getTaskId(), scheduledTask.getAttemptIdx());
-    failedTasks.remove(scheduledTask.getTaskId());
+  public void onTaskScheduled(final ExecutableTask executableTask) {
+    runningTasks.add(executableTask.getTaskId());
+    runningTaskToAttempt.put(executableTask.getTaskId(), executableTask.getAttemptIdx());
+    failedTasks.remove(executableTask.getTaskId());
 
     serializationExecutorService.submit(new Runnable() {
       @Override
       public void run() {
-        final byte[] serialized = SerializationUtils.serialize(scheduledTask);
+        final byte[] serialized = SerializationUtils.serialize(executableTask);
         sendControlMessage(
             ControlMessage.Message.newBuilder()
                 .setId(RuntimeIdGenerator.generateMessageId())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index a74ff10c..f32a4416 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -19,6 +19,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -136,14 +137,14 @@ public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan, fi
    * @param taskId whose state has changed
    * @param taskAttemptIndex of the task whose state has changed
    * @param newState the state to change to
-   * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
+   * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
    */
   @Override
   public void onTaskStateChanged(final String executorId,
                                  final String taskId,
                                  final int taskAttemptIndex,
                                  final TaskState.State newState,
-                                 @Nullable final String taskPutOnHold,
+                                 @Nullable final String vertexPutOnHold,
                                  final TaskState.RecoverableFailureCause failureCause) {
     final int currentTaskAttemptIndex = jobStateManager.getCurrentAttemptIndexForTask(taskId);
     if (taskAttemptIndex == currentTaskAttemptIndex) {
@@ -158,7 +159,7 @@ public void onTaskStateChanged(final String executorId,
           break;
         case ON_HOLD:
           jobStateManager.onTaskStateChanged(taskId, newState);
-          onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+          onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
           break;
         case FAILED_UNRECOVERABLE:
           throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on Task #")
@@ -392,7 +393,7 @@ private void scheduleStage(final PhysicalStage stageToSchedule) {
     LOG.info("Scheduling Stage {}", stageToSchedule.getId());
 
     // each readable and source task will be bounded in executor.
-    final List<Map<String, Readable>> logicalTaskIdToReadables = stageToSchedule.getLogicalTaskIdToReadables();
+    final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
 
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId);
@@ -400,23 +401,27 @@ private void scheduleStage(final PhysicalStage stageToSchedule) {
       final int attemptIdx = jobStateManager.getCurrentAttemptIndexForTask(taskId);
 
       LOG.debug("Enqueueing {}", taskId);
-      pendingTaskCollection.add(new ScheduledTask(physicalPlan.getId(),
-          stageToSchedule.getSerializedTaskDag(), taskId, stageIncomingEdges, stageOutgoingEdges, attemptIdx,
-          stageToSchedule.getContainerType(), logicalTaskIdToReadables.get(taskIdx)));
+      pendingTaskCollection.add(new ExecutableTask(
+          physicalPlan.getId(),
+          taskId,
+          attemptIdx,
+          stageToSchedule.getContainerType(),
+          stageToSchedule.getSerializedIRDAG(),
+          stageIncomingEdges,
+          stageOutgoingEdges,
+          vertexIdToReadables.get(taskIdx)));
     });
     schedulerRunner.onATaskAvailable();
   }
 
   /**
-   * Gets the DAG of a task from it's ID.
-   *
-   * @param taskId the ID of the task to get.
-   * @return the DAG of the task.
+   * @param taskId id of the task
+   * @return the IR dag
    */
-  private DAG<Task, RuntimeEdge<Task>> getTaskDagById(final String taskId) {
+  private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
     for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
       if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return physicalStage.getTaskDag();
+        return physicalStage.getIRDAG();
       }
     }
     throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
@@ -471,13 +476,13 @@ private void onTaskExecutionComplete(final String executorId,
 
   /**
    * Action for after task execution is put on hold.
-   * @param executorId     the ID of the executor.
-   * @param taskId    the ID of the task.
-   * @param taskPutOnHold  the ID of task that is put on hold.
+   * @param executorId       the ID of the executor.
+   * @param taskId           the ID of the task.
+   * @param vertexPutOnHold  the ID of vertex that is put on hold.
    */
   private void onTaskExecutionOnHold(final String executorId,
                                      final String taskId,
-                                     final String taskPutOnHold) {
+                                     final String vertexPutOnHold) {
     LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
       executor.onTaskExecutionComplete(taskId);
@@ -491,15 +496,14 @@ private void onTaskExecutionOnHold(final String executorId,
     if (stageComplete) {
       // get optimization vertex from the task.
       final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-          getTaskDagById(taskId).getVertices().stream() // get tasks list
-              .filter(task -> task.getId().equals(taskPutOnHold)) // find it
-              .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex
+          getVertexDagById(taskId).getVertices().stream() // get vertex list
+              .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it
               .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
               .distinct()
               .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
               .findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it
               + " called with failed task ids by some other task than "
-              + MetricCollectionBarrierTask.class.getSimpleName()));
+              + MetricCollectionBarrierVertex.class.getSimpleName()));
       // and we will use this vertex to perform metric collection and dynamic optimization.
 
       pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index 1c38e359..ab5081cf 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -46,10 +46,10 @@ private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy sour
 
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     Set<ExecutorRepresenter> candidates = executorRepresenterSet;
     for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, scheduledTask);
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, executableTask);
     }
     return candidates;
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index d64fffbf..f23a1f09 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -37,20 +37,20 @@ public ContainerTypeAwareSchedulingPolicy() {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
    *                               If the container type of target Task is NONE, it will return the original set.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
 
-    if (scheduledTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    if (executableTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(scheduledTask.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(executableTask.getContainerType()))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 2f598d28..e72f486c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -35,12 +35,12 @@ public FreeSlotSchedulingPolicy() {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
    *                               Executors that do not have any free slots will be filtered by this policy.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
             .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index d6696e9c..361307ff 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -38,9 +38,9 @@
 
   /**
    * Adds a Task to this collection.
-   * @param scheduledTask to add.
+   * @param executableTask to add.
    */
-  void add(final ScheduledTask scheduledTask);
+  void add(final ExecutableTask executableTask);
 
   /**
    * Removes the specified Task to be scheduled.
@@ -49,14 +49,14 @@
    * @throws NoSuchElementException if the specified Task is not in the queue,
    *                                or removing this Task breaks scheduling order
    */
-  ScheduledTask remove(final String taskId) throws NoSuchElementException;
+  ExecutableTask remove(final String taskId) throws NoSuchElementException;
 
   /**
    * Peeks stage that can be scheduled according to job dependency priority.
    * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
    * @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
    */
-  Optional<Collection<ScheduledTask>> peekSchedulableStage();
+  Optional<Collection<ExecutableTask>> peekSchedulableStage();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index 61077fa5..a232a799 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -47,12 +47,12 @@ public RoundRobinSchedulingPolicy() {
 
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     final OptionalInt minOccupancy =
         executorRepresenterSet.stream()
         .map(executor -> executor.getRunningTasks().size())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index f9bb73ac..d34eb568 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -30,9 +30,6 @@
  * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link PendingTaskCollection},
  * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
- *
- * Receives jobs to execute and schedules
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask} to executors.
  */
 @DriverSide
 @DefaultImplementation(BatchSingleJobScheduler.class)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index fa98e399..f411c1da 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +112,7 @@ void terminate() {
   }
 
   void doScheduleStage() {
-    final Collection<ScheduledTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
+    final Collection<ExecutableTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
     if (stageToSchedule == null) {
       // Task queue is empty
       LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
@@ -120,7 +120,7 @@ void doScheduleStage() {
     }
 
     final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
-    for (final ScheduledTask schedulableTask : stageToSchedule) {
+    for (final ExecutableTask schedulableTask : stageToSchedule) {
       final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
       LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index ab9e2513..1e71882b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@
 @DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
   Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final ScheduledTask scheduledTask);
+                                                      final ExecutableTask executableTask);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index a5f57361..0375e01e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -41,7 +41,7 @@
   /**
    * Pending Tasks awaiting to be scheduled for each stage.
    */
-  private final ConcurrentMap<String, Map<String, ScheduledTask>> stageIdToPendingTasks;
+  private final ConcurrentMap<String, Map<String, ExecutableTask>> stageIdToPendingTasks;
 
   /**
    * Stages with Tasks that have not yet been scheduled.
@@ -55,17 +55,17 @@ public SingleJobTaskCollection() {
   }
 
   @Override
-  public synchronized void add(final ScheduledTask scheduledTask) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+  public synchronized void add(final ExecutableTask executableTask) {
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
 
     stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
       if (taskIdToTask == null) {
-        final Map<String, ScheduledTask> taskIdToTaskMap = new HashMap<>();
-        taskIdToTaskMap.put(scheduledTask.getTaskId(), scheduledTask);
-        updateSchedulableStages(stageId, scheduledTask.getContainerType());
+        final Map<String, ExecutableTask> taskIdToTaskMap = new HashMap<>();
+        taskIdToTaskMap.put(executableTask.getTaskId(), executableTask);
+        updateSchedulableStages(stageId, executableTask.getContainerType());
         return taskIdToTaskMap;
       } else {
-        taskIdToTask.put(scheduledTask.getTaskId(), scheduledTask);
+        taskIdToTask.put(executableTask.getTaskId(), executableTask);
         return taskIdToTask;
       }
     });
@@ -81,18 +81,18 @@ public synchronized void add(final ScheduledTask scheduledTask) {
    *                                (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
    */
   @Override
-  public synchronized ScheduledTask remove(final String taskId) throws NoSuchElementException {
+  public synchronized ExecutableTask remove(final String taskId) throws NoSuchElementException {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       throw new NoSuchElementException("No schedulable stage in Task queue");
     }
 
-    final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
 
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
     }
-    final ScheduledTask taskToSchedule = pendingTasksForStage.remove(taskId);
+    final ExecutableTask taskToSchedule = pendingTasksForStage.remove(taskId);
     if (taskToSchedule == null) {
       throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
     }
@@ -115,13 +115,13 @@ public synchronized ScheduledTask remove(final String taskId) throws NoSuchEleme
    *         or {@link Optional#empty} if the queue is empty
    */
   @Override
-  public synchronized Optional<Collection<ScheduledTask>> peekSchedulableStage() {
+  public synchronized Optional<Collection<ExecutableTask>> peekSchedulableStage() {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       return Optional.empty();
     }
 
-    final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index 5432f0b8..f7056bff 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -59,15 +59,15 @@ public SourceLocationAwareSchedulingPolicy() {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
    *                               If there is no source locations, will return original set.
-   * @param scheduledTask {@link ScheduledTask} to be scheduled.
+   * @param executableTask {@link ExecutableTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTask scheduledTask) {
+                                                             final ExecutableTask executableTask) {
     final Set<String> sourceLocations;
     try {
-      sourceLocations = getSourceLocations(scheduledTask.getLogicalTaskIdToReadable().values());
+      sourceLocations = getSourceLocations(executableTask.getIrVertexIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
       return executorRepresenterSet;
     } catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 07d36a23..31504750 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,10 +16,8 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.ContainerTypeAwareSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -34,7 +32,7 @@
  * Tests {@link ContainerTypeAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
 public final class ContainerTypeAwareSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -50,24 +48,24 @@ public void testContainerTypeAware() {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
-    final ScheduledTask scheduledTask1 = mock(ScheduledTask.class);
-    when(scheduledTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    final ExecutableTask executableTask1 = mock(ExecutableTask.class);
+    when(executableTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, scheduledTask1);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, executableTask1);
 
     final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors1, candidateExecutors1);
 
-    final ScheduledTask scheduledTask2 = mock(ScheduledTask.class);
-    when(scheduledTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    final ExecutableTask executableTask2 = mock(ExecutableTask.class);
+    when(executableTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, scheduledTask2);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, executableTask2);
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
index cb100f00..0071b9f0 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,10 +15,8 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.FreeSlotSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -34,7 +32,7 @@
  * Tests {@link FreeSlotSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
 public final class FreeSlotSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
@@ -53,12 +51,12 @@ public void testFreeSlot() {
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
-    final ScheduledTask scheduledTask = mock(ScheduledTask.class);
+    final ExecutableTask executableTask = mock(ExecutableTask.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
index 10cced06..5edd885e 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
 public final class RoundRobinSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
@@ -50,12 +50,12 @@ public void testRoundRobin() {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    final ScheduledTask scheduledTask = mock(ScheduledTask.class);
+    final ExecutableTask executableTask = mock(ExecutableTask.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
index 5aecc3d2..0ae5998b 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
@@ -83,7 +83,7 @@ public void testPushPriority() throws Exception {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTask dequeuedTask = dequeue();
+        final ExecutableTask dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(1).getId());
 
@@ -135,7 +135,7 @@ public void testPullPriority() throws Exception {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ScheduledTask dequeuedTask = dequeue();
+        final ExecutableTask dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(0).getId());
 
@@ -211,9 +211,15 @@ public void testWithDifferentContainerType() throws Exception {
    */
   private void scheduleStage(final PhysicalStage stage) {
     stage.getTaskIds().forEach(taskId ->
-        pendingTaskPriorityQueue.add(new ScheduledTask(
-            "TestPlan", stage.getSerializedTaskDag(), taskId, Collections.emptyList(),
-            Collections.emptyList(), 0, stage.getContainerType(), Collections.emptyMap())));
+        pendingTaskPriorityQueue.add(new ExecutableTask(
+            "TestPlan",
+            taskId,
+            0,
+            stage.getContainerType(),
+            stage.getSerializedIRDAG(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyMap())));
   }
 
   /**
@@ -221,17 +227,17 @@ private void scheduleStage(final PhysicalStage stage) {
    * @return the stage name of the dequeued task.
    */
   private String dequeueAndGetStageId() {
-    final ScheduledTask scheduledTask = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+    final ExecutableTask executableTask = dequeue();
+    return RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
   }
 
   /**
    * Dequeues a scheduled task from the task priority queue.
    * @return the Task dequeued
    */
-  private ScheduledTask dequeue() {
-    final Collection<ScheduledTask> scheduledTasks
+  private ExecutableTask dequeue() {
+    final Collection<ExecutableTask> executableTasks
         = pendingTaskPriorityQueue.peekSchedulableStage().get();
-    return pendingTaskPriorityQueue.remove(scheduledTasks.iterator().next().getTaskId());
+    return pendingTaskPriorityQueue.remove(executableTasks.iterator().next().getTaskId());
   }
 }
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 87aa1237..65f9d984 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -33,7 +33,7 @@
  * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class, Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ private static ExecutorRepresenter mockExecutorRepresenter(final String executor
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ScheduledTask} when
+   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ExecutableTask} when
    * there are no executors in appropriate location(s).
    */
   @Test
@@ -54,7 +54,7 @@ public void testSourceLocationAwareSchedulingNotAvailable() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
 
     // Prepare test scenario
-    final ScheduledTask task = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task = CreateExecutableTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)));
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
@@ -70,19 +70,19 @@ public void testSourceLocationAwareSchedulingNotAvailable() {
   public void testSourceLocationAwareSchedulingWithMultiSource() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
-    final ScheduledTask task0 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task0 = CreateExecutableTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
-    final ScheduledTask task1 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task1 = CreateExecutableTask.withReadablesWithSourceLocations(
         Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
-    final ScheduledTask task2 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task2 = CreateExecutableTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
             Arrays.asList(SITE_1, SITE_2)));
-    final ScheduledTask task3 = CreateScheduledTask.withReadablesWithSourceLocations(
+    final ExecutableTask task3 = CreateExecutableTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
             Arrays.asList(SITE_0, SITE_2)));
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
-    for (final ScheduledTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
+    for (final ExecutableTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
       assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
           new HashSet<>(Collections.singletonList(e)), task));
     }
@@ -90,23 +90,23 @@ public void testSourceLocationAwareSchedulingWithMultiSource() {
 
 
   /**
-   * Utility for creating {@link ScheduledTask}.
+   * Utility for creating {@link ExecutableTask}.
    */
-  private static final class CreateScheduledTask {
+  private static final class CreateExecutableTask {
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
     private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
 
-    private static ScheduledTask doCreate(final Collection<Readable> readables) {
-      final ScheduledTask mockInstance = mock(ScheduledTask.class);
+    private static ExecutableTask doCreate(final Collection<Readable> readables) {
+      final ExecutableTask mockInstance = mock(ExecutableTask.class);
       final Map<String, Readable> readableMap = new HashMap<>();
       readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
           readable));
       when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement()));
-      when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
+      when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap);
       return mockInstance;
     }
 
-    static ScheduledTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+    static ExecutableTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ static ScheduledTask withReadablesWithSourceLocations(final Collection<List<Stri
       }
     }
 
-    static ScheduledTask withReadablesWithoutSourceLocations(final int numReadables) {
+    static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ static ScheduledTask withReadablesWithoutSourceLocations(final int numReadables)
       }
     }
 
-    static ScheduledTask withReadablesWhichThrowException(final int numReadables) {
+    static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ static ScheduledTask withReadablesWhichThrowException(final int numReadables) {
       }
     }
 
-    static ScheduledTask withoutReadables() {
+    static ExecutableTask withoutReadables() {
       return doCreate(Collections.emptyList());
     }
   }
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index d856dd05..631f8373 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -101,7 +101,7 @@ private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDA
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
-    return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getTaskIRVertexMap());
+    return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
   }
 
   /**
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index 51b753e1..f2579262 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -82,7 +82,7 @@ public void testState() throws Exception {
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
     final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index affcbed2..a6df1d45 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -20,10 +20,12 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -63,7 +65,7 @@
   private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
   private static final int SOURCE_PARALLELISM = 5;
   private List elements;
-  private Map<String, List<Object>> taskIdToOutputData;
+  private Map<String, List<Object>> vertexIdToOutputData;
   private DataTransferFactory dataTransferFactory;
   private TaskStateManager taskStateManager;
   private MetricMessageSender metricMessageSender;
@@ -76,7 +78,7 @@ public void setUp() throws Exception {
     taskStateManager = mock(TaskStateManager.class);
 
     // Mock a DataTransferFactory.
-    taskIdToOutputData = new HashMap<>();
+    vertexIdToOutputData = new HashMap<>();
     dataTransferFactory = mock(DataTransferFactory.class);
     when(dataTransferFactory.createReader(anyInt(), any(), any())).then(new InterStageReaderAnswer());
     when(dataTransferFactory.createWriter(any(), anyInt(), any(), any())).then(new WriterAnswer());
@@ -88,17 +90,13 @@ public void setUp() throws Exception {
   }
 
   /**
-   * Test the {@link BoundedSourceTask} processing in {@link TaskExecutor}.
+   * Test the {@link edu.snu.nemo.common.ir.vertex.SourceVertex} processing in {@link TaskExecutor}.
    */
-  @Test(timeout=2000)
-  public void testSourceTask() throws Exception {
-    final IRVertex sourceIRVertex = new SimpleIRVertex();
-    final String sourceIrVertexId = sourceIRVertex.getId();
-
-    final String sourceTaskId = RuntimeIdGenerator.generateLogicalTaskId("Source_IR_Vertex");
+  @Test(timeout=5000)
+  public void testSourceVertex() throws Exception {
+    final IRVertex sourceIRVertex = new EmptyComponents.EmptySourceVertex("empty");
     final String stageId = RuntimeIdGenerator.generateStageId(0);
 
-    final BoundedSourceTask<Integer> boundedSourceTask = new BoundedSourceTask<>(sourceTaskId, sourceIrVertexId);
     final Readable readable = new Readable() {
       @Override
       public Iterable read() throws Exception {
@@ -109,30 +107,37 @@ public Iterable read() throws Exception {
         throw new UnsupportedOperationException();
       }
     };
-    final Map<String, Readable> logicalIdToReadable = new HashMap<>();
-    logicalIdToReadable.put(sourceTaskId, readable);
+    final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+    vertexIdToReadable.put(sourceIRVertex.getId(), readable);
 
-    final DAG<Task, RuntimeEdge<Task>> taskDag =
-        new DAGBuilder<Task, RuntimeEdge<Task>>().addVertex(boundedSourceTask).build();
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+        new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().addVertex(sourceIRVertex).buildWithoutSourceSinkCheck();
     final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
-    final ScheduledTask scheduledTask =
-        new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.emptyList(),
-            Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, logicalIdToReadable);
+    final ExecutableTask executableTask =
+        new ExecutableTask(
+            "testSourceVertex",
+            taskId,
+            0,
+            CONTAINER_TYPE,
+            new byte[0],
+            Collections.emptyList(),
+            Collections.singletonList(stageOutEdge),
+            vertexIdToReadable);
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.
-    assertEquals(100, taskIdToOutputData.get(sourceTaskId).size());
-    assertEquals(elements.get(0), taskIdToOutputData.get(sourceTaskId).get(0));
+    assertEquals(100, vertexIdToOutputData.get(sourceIRVertex.getId()).size());
+    assertEquals(elements.get(0), vertexIdToOutputData.get(sourceIRVertex.getId()).get(0));
   }
 
   /**
-   * Test the {@link OperatorTask} processing in {@link TaskExecutor}.
+   * Test the {@link edu.snu.nemo.common.ir.vertex.OperatorVertex} processing in {@link TaskExecutor}.
    *
    * The DAG of the task to test will looks like:
    * operator task 1 -> operator task 2
@@ -142,47 +147,46 @@ public Iterable read() throws Exception {
    * Because of this, the operator task 1 will process multiple partitions and emit data in multiple times also.
    * On the other hand, operator task 2 will receive the output data once and produce a single output.
    */
-  @Test//(timeout=2000)
-  public void testOperatorTask() throws Exception {
-    final IRVertex operatorIRVertex1 = new SimpleIRVertex();
-    final IRVertex operatorIRVertex2 = new SimpleIRVertex();
-    final String operatorIRVertexId1 = operatorIRVertex1.getId();
-    final String operatorIRVertexId2 = operatorIRVertex2.getId();
+  @Test(timeout=5000)
+  public void testOperatorVertex() throws Exception {
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new SimpleTransform());
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new SimpleTransform());
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
 
-    final String operatorTaskId1 = RuntimeIdGenerator.generateLogicalTaskId("Operator_vertex_1");
-    final String operatorTaskId2 = RuntimeIdGenerator.generateLogicalTaskId("Operator_vertex_2");
     final String stageId = RuntimeIdGenerator.generateStageId(1);
 
-    final OperatorTask operatorTask1 =
-        new OperatorTask(operatorTaskId1, operatorIRVertexId1, new SimpleTransform());
-    final OperatorTask operatorTask2 =
-        new OperatorTask(operatorTaskId2, operatorIRVertexId2, new SimpleTransform());
     final Coder coder = Coder.DUMMY_CODER;
     ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
     edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    final DAG<Task, RuntimeEdge<Task>> taskDag = new DAGBuilder<Task, RuntimeEdge<Task>>()
-        .addVertex(operatorTask1)
-        .addVertex(operatorTask2)
-        .connectVertices(new RuntimeEdge<Task>(
-            runtimeIREdgeId, edgeProperties, operatorTask1, operatorTask2, coder))
-        .build();
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+        .addVertex(operatorIRVertex1)
+        .addVertex(operatorIRVertex2)
+        .connectVertices(new RuntimeEdge<IRVertex>(
+            runtimeIREdgeId, edgeProperties, operatorIRVertex1, operatorIRVertex2, coder))
+        .buildWithoutSourceSinkCheck();
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
     final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
     final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
-    final ScheduledTask scheduledTask =
-        new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.singletonList(stageInEdge),
-            Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, Collections.emptyMap());
+    final ExecutableTask executableTask =
+        new ExecutableTask(
+            "testSourceVertex",
+            taskId,
+            0,
+            CONTAINER_TYPE,
+            new byte[0],
+            Collections.singletonList(stageInEdge),
+            Collections.singletonList(stageOutEdge),
+            Collections.emptyMap());
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.
-    assertEquals(100, taskIdToOutputData.get(operatorTaskId2).size());
+    assertEquals(100, vertexIdToOutputData.get(operatorIRVertex2.getId()).size());
   }
 
   /**
@@ -234,15 +238,15 @@ public InputReader answer(final InvocationOnMock invocationOnMock) throws Throwa
     @Override
     public OutputWriter answer(final InvocationOnMock invocationOnMock) throws Throwable {
       final Object[] args = invocationOnMock.getArguments();
-      final Task dstTask = (Task) args[0];
+      final IRVertex vertex = (IRVertex) args[0];
       final OutputWriter outputWriter = mock(OutputWriter.class);
       doAnswer(new Answer() {
         @Override
         public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
           final Object[] args = invocationOnMock.getArguments();
           final Object dataToWrite = args[0];
-          taskIdToOutputData.computeIfAbsent(dstTask.getId(), emptyTaskId -> new ArrayList<>());
-          taskIdToOutputData.get(dstTask.getId()).add(dataToWrite);
+          vertexIdToOutputData.computeIfAbsent(vertex.getId(), emptyTaskId -> new ArrayList<>());
+          vertexIdToOutputData.get(vertex.getId()).add(dataToWrite);
           return null;
         }
       }).when(outputWriter).write(any());
@@ -250,16 +254,6 @@ public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
     }
   }
 
-  /**
-   * Simple {@link IRVertex} for testing.
-   */
-  private class SimpleIRVertex extends IRVertex {
-    @Override
-    public IRVertex getClone() {
-      return null; // Not used.
-    }
-  }
-
   /**
    * Simple {@link Transform} for testing.
    * @param <T> input/output type.
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index cb9ee788..cfdeb2fe 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -29,7 +29,6 @@
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.stores.*;
@@ -81,8 +80,8 @@
   private BlockManagerMaster blockManagerMaster;
   private LocalMessageDispatcher messageDispatcher;
   // Variables for shuffle test
-  private static final int NUM_WRITE_TASKS = 3;
-  private static final int NUM_READ_TASKS = 3;
+  private static final int NUM_WRITE_VERTICES = 3;
+  private static final int NUM_READ_VERTICES = 3;
   private static final int DATA_SIZE = 1000;
   private List<String> blockIdList;
   private List<List<NonSerializedPartition<Integer>>> partitionsPerBlock;
@@ -115,20 +114,18 @@ public void setUp() throws Exception {
     when(serializerManager.getSerializer(any())).thenReturn(SERIALIZER);
 
     // Following part is for for the shuffle test.
-    final List<String> writeTaskIdList = new ArrayList<>(NUM_WRITE_TASKS);
-    final List<String> readTaskIdList = new ArrayList<>(NUM_READ_TASKS);
-    blockIdList = new ArrayList<>(NUM_WRITE_TASKS);
-    partitionsPerBlock = new ArrayList<>(NUM_WRITE_TASKS);
+    final List<String> writeVertexIdList = new ArrayList<>(NUM_WRITE_VERTICES);
+    final List<String> readTaskIdList = new ArrayList<>(NUM_READ_VERTICES);
+    blockIdList = new ArrayList<>(NUM_WRITE_VERTICES);
+    partitionsPerBlock = new ArrayList<>(NUM_WRITE_VERTICES);
 
     // Generates the ids of the tasks to be used.
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(
-        number -> writeTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Write_IR_vertex")));
-    IntStream.range(0, NUM_READ_TASKS).forEach(
-        number -> readTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Read_IR_vertex")));
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(number -> writeVertexIdList.add("Write_IR_vertex"));
+    IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
 
     // Generates the ids and the data of the blocks to be used.
     final String shuffleEdge = RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx -> {
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
       final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, writeTaskIdx);
       blockIdList.add(blockId);
@@ -137,27 +134,26 @@ public void setUp() throws Exception {
           blockId, BlockState.State.SCHEDULED, null);
 
       // Create blocks for this block.
-      final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_TASKS);
+      final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_VERTICES);
       partitionsPerBlock.add(partitionsForBlock);
-      IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx -> {
-        final int partitionsCount = writeTaskIdx * NUM_READ_TASKS + readTaskIdx;
+      IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx -> {
+        final int partitionsCount = writeTaskIdx * NUM_READ_VERTICES + readTaskIdx;
         partitionsForBlock.add(new NonSerializedPartition(
             readTaskIdx, getRangedNumList(partitionsCount * DATA_SIZE, (partitionsCount + 1) * DATA_SIZE), -1, -1));
       });
     });
 
     // Following part is for the concurrent read test.
-    final String writeTaskId = RuntimeIdGenerator.generateLogicalTaskId("conc_write_IR_vertex");
+    final String writeTaskId = "conc_write_IR_vertex";
     final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
     final String concEdge = RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
 
     // Generates the ids and the data to be used.
-    concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1);
+    concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
     blockManagerMaster.initializeState(concBlockId, "unused");
     blockManagerMaster.onBlockStateChanged(
         concBlockId, BlockState.State.SCHEDULED, null);
-    IntStream.range(0, NUM_CONC_READ_TASKS).forEach(
-        number -> concReadTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("conc_read_IR_vertex")));
+    IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> concReadTaskIdList.add("conc_read_IR_vertex"));
     concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, CONC_READ_DATA_SIZE), -1, -1);
 
     // Following part is for the shuffle in hash range test
@@ -170,16 +166,14 @@ public void setUp() throws Exception {
     expectedDataInRange = new ArrayList<>(NUM_READ_HASH_TASKS);
 
     // Generates the ids of the tasks to be used.
-    IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(
-        number -> writeHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_write_IR_vertex")));
-    IntStream.range(0, NUM_READ_HASH_TASKS).forEach(
-        number -> readHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_read_IR_vertex")));
+    IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
+    IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
     final String hashEdge = RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
-          hashEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1 + writeTaskIdx);
+          hashEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1 + writeTaskIdx);
       hashedBlockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
@@ -307,14 +301,14 @@ private GlusterFileStore createGlusterFileStore(final String executorId)
    */
   private void shuffle(final BlockStore writerSideStore,
                        final BlockStore readerSideStore) {
-    final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_TASKS);
-    final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_TASKS);
-    final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_TASKS);
-    final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_TASKS);
+    final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_VERTICES);
+    final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_VERTICES);
+    final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_VERTICES);
+    final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_VERTICES);
     final long startNano = System.nanoTime();
 
     // Write concurrently
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx ->
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx ->
         writeFutureList.add(writeExecutor.submit(new Callable<Boolean>() {
           @Override
           public Boolean call() {
@@ -338,7 +332,7 @@ public Boolean call() {
         })));
 
     // Wait each writer to success
-    IntStream.range(0, NUM_WRITE_TASKS).forEach(writer -> {
+    IntStream.range(0, NUM_WRITE_VERTICES).forEach(writer -> {
       try {
         assertTrue(writeFutureList.get(writer).get());
       } catch (final Exception e) {
@@ -348,12 +342,12 @@ public Boolean call() {
     final long writeEndNano = System.nanoTime();
 
     // Read concurrently and check whether the result is equal to the input
-    IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx ->
+    IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx ->
         readFutureList.add(readExecutor.submit(new Callable<Boolean>() {
           @Override
           public Boolean call() {
             try {
-              for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_TASKS; writeTaskIdx++) {
+              for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_VERTICES; writeTaskIdx++) {
                 readResultCheck(blockIdList.get(writeTaskIdx), HashRange.of(readTaskIdx, readTaskIdx + 1),
                     readerSideStore, partitionsPerBlock.get(writeTaskIdx).get(readTaskIdx).getData());
               }
@@ -366,7 +360,7 @@ public Boolean call() {
         })));
 
     // Wait each reader to success
-    IntStream.range(0, NUM_READ_TASKS).forEach(reader -> {
+    IntStream.range(0, NUM_READ_VERTICES).forEach(reader -> {
       try {
         assertTrue(readFutureList.get(reader).get());
       } catch (final Exception e) {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 998faf5f..61ef9b0b 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -39,7 +39,6 @@
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -74,7 +73,6 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.awt.*;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
@@ -544,7 +542,7 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
   }
 
   private PhysicalStage setupStages(final String stageId) {
-    final DAG<Task, RuntimeEdge<Task>> emptyDag = new DAGBuilder<Task, RuntimeEdge<Task>>().build();
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
 
     return new PhysicalStage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
index bdc62e53..5af24765 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
@@ -125,7 +125,7 @@ public void testPhysicalPlanStateChanges() throws Exception {
         new TestPolicy(), "");
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertEquals(jobStateManager.getJobId(), "TestPlan");
@@ -164,7 +164,7 @@ public void testWaitUntilFinish() {
     final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
     final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
         blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertFalse(jobStateManager.checkJobTermination());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services