You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by je...@apache.org on 2018/06/05 09:39:13 UTC

[incubator-nemo] branch master updated: [NEMO-95] Rename ExecutableTask to Task (#29)

This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new a387c4c  [NEMO-95] Rename ExecutableTask to Task (#29)
a387c4c is described below

commit a387c4cd0f6285d72b1adef1e5e80b9b99a104cd
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 5 18:39:02 2018 +0900

    [NEMO-95] Rename ExecutableTask to Task (#29)
    
    JIRA: NEMO-95: Rename ExecutableTask to Task
    
    Major changes:
    * Rename ExecutableTask to Task
    
    Minor changes to note:
    N/A
    
    Tests for the changes:
    N/A
    
    Other comments:
    N/A
    
    resolves NEMO-95
---
 .../common/plan/{ExecutableTask.java => Task.java} | 38 +++++++++++-----------
 .../edu/snu/nemo/runtime/executor/Executor.java    | 26 +++++++--------
 .../snu/nemo/runtime/executor/TaskExecutor.java    | 20 ++++++------
 .../nemo/runtime/executor/TaskStateManager.java    |  8 ++---
 .../master/resource/ExecutorRepresenter.java       | 14 ++++----
 .../master/scheduler/BatchSingleJobScheduler.java  |  2 +-
 .../scheduler/CompositeSchedulingPolicy.java       |  6 ++--
 .../ContainerTypeAwareSchedulingPolicy.java        | 10 +++---
 .../master/scheduler/FreeSlotSchedulingPolicy.java |  6 ++--
 .../master/scheduler/PendingTaskCollection.java    | 10 +++---
 .../scheduler/RoundRobinSchedulingPolicy.java      |  6 ++--
 .../runtime/master/scheduler/SchedulerRunner.java  | 22 ++++++-------
 .../runtime/master/scheduler/SchedulingPolicy.java |  4 +--
 .../master/scheduler/SingleJobTaskCollection.java  | 26 +++++++--------
 .../SourceLocationAwareSchedulingPolicy.java       |  8 ++---
 .../ContainerTypeAwareSchedulingPolicyTest.java    | 16 ++++-----
 .../scheduler/FreeSlotSchedulingPolicyTest.java    |  8 ++---
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |  8 ++---
 .../master/scheduler/SingleTaskQueueTest.java      | 18 +++++-----
 .../SourceLocationAwareSchedulingPolicyTest.java   | 34 +++++++++----------
 .../tests/runtime/executor/TaskExecutorTest.java   | 14 ++++----
 21 files changed, 152 insertions(+), 152 deletions(-)

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
similarity index 72%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 1da8f97..5cd8b16 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * An ExecutableTask is a self-contained executable that can be executed in specific types of containers.
+ * A Task is a self-contained executable that can be executed on a machine.
  */
-public final class ExecutableTask implements Serializable {
+public final class Task implements Serializable {
   private final String jobId;
   private final String taskId;
   private final int taskIdx;
@@ -33,35 +33,35 @@ public final class ExecutableTask implements Serializable {
   private final List<StageEdge> taskOutgoingEdges;
   private final int attemptIdx;
   private final String containerType;
-  private final byte[] serializedTaskDag;
+  private final byte[] serializedIRDag;
   private final Map<String, Readable> irVertexIdToReadable;
 
   /**
    * Constructor.
    *
    * @param jobId                the id of the job.
-   * @param taskId               the ID of the scheduled task.
+   * @param taskId               the ID of the task.
    * @param attemptIdx           the attempt index.
    * @param containerType        the type of container to execute the task on.
    * @param serializedIRDag      the serialized DAG of the task.
    * @param taskIncomingEdges    the incoming edges of the task.
    * @param taskOutgoingEdges    the outgoing edges of the task.
-   * @param irVertexIdToReadable the map between logical task ID and readable.
+   * @param irVertexIdToReadable the map between IRVertex id to readable.
    */
-  public ExecutableTask(final String jobId,
-                        final String taskId,
-                        final int attemptIdx,
-                        final String containerType,
-                        final byte[] serializedIRDag,
-                        final List<StageEdge> taskIncomingEdges,
-                        final List<StageEdge> taskOutgoingEdges,
-                        final Map<String, Readable> irVertexIdToReadable) {
+  public Task(final String jobId,
+              final String taskId,
+              final int attemptIdx,
+              final String containerType,
+              final byte[] serializedIRDag,
+              final List<StageEdge> taskIncomingEdges,
+              final List<StageEdge> taskOutgoingEdges,
+              final Map<String, Readable> irVertexIdToReadable) {
     this.jobId = jobId;
     this.taskId = taskId;
     this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
     this.attemptIdx = attemptIdx;
     this.containerType = containerType;
-    this.serializedTaskDag = serializedIRDag;
+    this.serializedIRDag = serializedIRDag;
     this.taskIncomingEdges = taskIncomingEdges;
     this.taskOutgoingEdges = taskOutgoingEdges;
     this.irVertexIdToReadable = irVertexIdToReadable;
@@ -75,21 +75,21 @@ public final class ExecutableTask implements Serializable {
   }
 
   /**
-   * @return the serialized DAG of the task.
+   * @return the serialized IR DAG of the task.
    */
   public byte[] getSerializedIRDag() {
-    return serializedTaskDag;
+    return serializedIRDag;
   }
 
   /**
-   * @return the ID of the scheduled task.
+   * @return the ID of the task.
    */
   public String getTaskId() {
     return taskId;
   }
 
   /**
-   * @return the idx of the scheduled task.
+   * @return the idx of the task.
    */
   public int getTaskIdx() {
     return taskIdx;
@@ -124,7 +124,7 @@ public final class ExecutableTask implements Serializable {
   }
 
   /**
-   * @return the map between logical task ID and readable.
+   * @return the map between IRVertex id and readable.
    */
   public Map<String, Readable> getIrVertexIdToReadable() {
     return irVertexIdToReadable;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8872990..287ca01 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -28,7 +28,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import org.apache.commons.lang3.SerializationUtils;
@@ -87,26 +87,26 @@ public final class Executor {
     return executorId;
   }
 
-  private synchronized void onTaskReceived(final ExecutableTask executableTask) {
+  private synchronized void onTaskReceived(final Task task) {
     LOG.debug("Executor [{}] received Task [{}] to execute.",
-        new Object[]{executorId, executableTask.getTaskId()});
-    executorService.execute(() -> launchTask(executableTask));
+        new Object[]{executorId, task.getTaskId()});
+    executorService.execute(() -> launchTask(task));
   }
 
   /**
    * Launches the Task, and keeps track of the execution state with taskStateManager.
-   * @param executableTask to launch.
+   * @param task to launch.
    */
-  private void launchTask(final ExecutableTask executableTask) {
+  private void launchTask(final Task task) {
     try {
       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
-          SerializationUtils.deserialize(executableTask.getSerializedIRDag());
+          SerializationUtils.deserialize(task.getSerializedIRDag());
       final TaskStateManager taskStateManager =
-          new TaskStateManager(executableTask, executorId, persistentConnectionToMasterMap, metricMessageSender);
+          new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
-      executableTask.getTaskIncomingEdges()
+      task.getTaskIncomingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      executableTask.getTaskOutgoingEdges()
+      task.getTaskOutgoingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v)
@@ -114,7 +114,7 @@ public final class Executor {
       });
 
       new TaskExecutor(
-          executableTask, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+          task, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
@@ -149,9 +149,9 @@ public final class Executor {
       switch (message.getType()) {
       case ScheduleTask:
         final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
-        final ExecutableTask executableTask =
+        final Task task =
             SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
-        onTaskReceived(executableTask);
+        onTaskReceived(task);
         break;
       default:
         throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index 09a43cc..4a9b1ee 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.state.TaskState;
@@ -46,7 +46,7 @@ public final class TaskExecutor {
   private static final String ITERATORID_PREFIX = "ITERATOR_";
   private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
 
-  // From ExecutableTask
+  // From Task
   private final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag;
   private final String taskId;
   private final int taskIdx;
@@ -81,24 +81,24 @@ public final class TaskExecutor {
 
   /**
    * Constructor.
-   * @param executableTask Task with information needed during execution.
+   * @param task Task with information needed during execution.
    * @param irVertexDag A DAG of vertices.
    * @param taskStateManager State manager for this Task.
    * @param channelFactory For reading from/writing to data to other Stages.
    * @param metricMessageSender For sending metric with execution stats to Master.
    */
-  public TaskExecutor(final ExecutableTask executableTask,
+  public TaskExecutor(final Task task,
                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
                       final DataTransferFactory channelFactory,
                       final MetricMessageSender metricMessageSender) {
-    // Information from the ExecutableTask.
+    // Information from the Task.
     this.irVertexDag = irVertexDag;
-    this.taskId = executableTask.getTaskId();
-    this.taskIdx = executableTask.getTaskIdx();
-    this.stageIncomingEdges = executableTask.getTaskIncomingEdges();
-    this.stageOutgoingEdges = executableTask.getTaskOutgoingEdges();
-    this.irVertexIdToReadable = executableTask.getIrVertexIdToReadable();
+    this.taskId = task.getTaskId();
+    this.taskIdx = task.getTaskIdx();
+    this.stageIncomingEdges = task.getTaskIncomingEdges();
+    this.stageOutgoingEdges = task.getTaskOutgoingEdges();
+    this.irVertexIdToReadable = task.getIrVertexIdToReadable();
 
     // Other parameters.
     this.taskStateManager = taskStateManager;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 7b1bc52..c06ab5a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 
 import java.util.*;
 
@@ -44,12 +44,12 @@ public final class TaskStateManager {
   private final MetricCollector metricCollector;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
-  public TaskStateManager(final ExecutableTask executableTask,
+  public TaskStateManager(final Task task,
                           final String executorId,
                           final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                           final MetricMessageSender metricMessageSender) {
-    this.taskId = executableTask.getTaskId();
-    this.attemptIdx = executableTask.getAttemptIdx();
+    this.taskId = task.getTaskId();
+    this.attemptIdx = task.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.metricCollector = new MetricCollector(metricMessageSender);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index f73ecdb..62ab5c6 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
@@ -95,17 +95,17 @@ public final class ExecutorRepresenter {
 
   /**
    * Marks the Task as running, and sends scheduling message to the executor.
-   * @param executableTask
+   * @param task
    */
-  public void onTaskScheduled(final ExecutableTask executableTask) {
-    runningTasks.add(executableTask.getTaskId());
-    runningTaskToAttempt.put(executableTask.getTaskId(), executableTask.getAttemptIdx());
-    failedTasks.remove(executableTask.getTaskId());
+  public void onTaskScheduled(final Task task) {
+    runningTasks.add(task.getTaskId());
+    runningTaskToAttempt.put(task.getTaskId(), task.getAttemptIdx());
+    failedTasks.remove(task.getTaskId());
 
     serializationExecutorService.submit(new Runnable() {
       @Override
       public void run() {
-        final byte[] serialized = SerializationUtils.serialize(executableTask);
+        final byte[] serialized = SerializationUtils.serialize(task);
         sendControlMessage(
             ControlMessage.Message.newBuilder()
                 .setId(RuntimeIdGenerator.generateMessageId())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index c51d1ec..273f108 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -400,7 +400,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       final int attemptIdx = jobStateManager.getCurrentAttemptIndexForTask(taskId);
 
       LOG.debug("Enqueueing {}", taskId);
-      pendingTaskCollection.add(new ExecutableTask(
+      pendingTaskCollection.add(new Task(
           physicalPlan.getId(),
           taskId,
           attemptIdx,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index 66fa986..e15e401 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -46,10 +46,10 @@ public final class CompositeSchedulingPolicy implements SchedulingPolicy {
 
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     Set<ExecutorRepresenter> candidates = executorRepresenterSet;
     for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, executableTask);
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, task);
     }
     return candidates;
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index 46b3c6b..b8f7d74 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -37,20 +37,20 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
    *                               If the container type of target Task is NONE, it will return the original set.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
 
-    if (executableTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    if (task.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(executableTask.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(task.getContainerType()))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 5d19f52..d92d9d4 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -35,12 +35,12 @@ public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
    *                               Executors that do not have any free slots will be filtered by this policy.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
             .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 1d8c8a2..8d954e3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -38,9 +38,9 @@ public interface PendingTaskCollection {
 
   /**
    * Adds a Task to this collection.
-   * @param executableTask to add.
+   * @param task to add.
    */
-  void add(final ExecutableTask executableTask);
+  void add(final Task task);
 
   /**
    * Removes the specified Task to be scheduled.
@@ -49,14 +49,14 @@ public interface PendingTaskCollection {
    * @throws NoSuchElementException if the specified Task is not in the queue,
    *                                or removing this Task breaks scheduling order
    */
-  ExecutableTask remove(final String taskId) throws NoSuchElementException;
+  Task remove(final String taskId) throws NoSuchElementException;
 
   /**
    * Peeks stage that can be scheduled according to job dependency priority.
    * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
    * @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
    */
-  Optional<Collection<ExecutableTask>> peekSchedulableStage();
+  Optional<Collection<Task>> peekSchedulableStage();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index afdc671..5c6e65c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -47,12 +47,12 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
 
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     final OptionalInt minOccupancy =
         executorRepresenterSet.stream()
         .map(executor -> executor.getRunningTasks().size())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index aa53780..a329c4b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +112,7 @@ public final class SchedulerRunner {
   }
 
   void doScheduleStage() {
-    final Collection<ExecutableTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
+    final Collection<Task> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
     if (stageToSchedule == null) {
       // Task queue is empty
       LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
@@ -120,27 +120,27 @@ public final class SchedulerRunner {
     }
 
     final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
-    for (final ExecutableTask schedulableTask : stageToSchedule) {
-      final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
-      LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
+    for (final Task task : stageToSchedule) {
+      final JobStateManager jobStateManager = jobStateManagers.get(task.getJobId());
+      LOG.debug("Trying to schedule {}...", task.getTaskId());
 
       executorRegistry.viewExecutors(executors -> {
         final Set<ExecutorRepresenter> candidateExecutors =
-            schedulingPolicy.filterExecutorRepresenters(executors, schedulableTask);
+            schedulingPolicy.filterExecutorRepresenters(executors, task);
         final Optional<ExecutorRepresenter> firstCandidate = candidateExecutors.stream().findFirst();
 
         if (firstCandidate.isPresent()) {
           // update metadata first
-          jobStateManager.onTaskStateChanged(schedulableTask.getTaskId(), TaskState.State.EXECUTING);
-          pendingTaskCollection.remove(schedulableTask.getTaskId());
+          jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
+          pendingTaskCollection.remove(task.getTaskId());
           numScheduledTasks.incrementAndGet();
 
           // send the task
           final ExecutorRepresenter selectedExecutor = firstCandidate.get();
-          selectedExecutor.onTaskScheduled(schedulableTask);
-          LOG.debug("Successfully scheduled {}", schedulableTask.getTaskId());
+          selectedExecutor.onTaskScheduled(task);
+          LOG.debug("Successfully scheduled {}", task.getTaskId());
         } else {
-          LOG.debug("Failed to schedule {}", schedulableTask.getTaskId());
+          LOG.debug("Failed to schedule {}", task.getTaskId());
         }
       });
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 7ea05fb..0064310 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@ import java.util.Set;
 @DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
   Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final ExecutableTask executableTask);
+                                                      final Task task);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index fae4565..b1318b1 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -41,7 +41,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
   /**
    * Pending Tasks awaiting to be scheduled for each stage.
    */
-  private final ConcurrentMap<String, Map<String, ExecutableTask>> stageIdToPendingTasks;
+  private final ConcurrentMap<String, Map<String, Task>> stageIdToPendingTasks;
 
   /**
    * Stages with Tasks that have not yet been scheduled.
@@ -55,17 +55,17 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
   }
 
   @Override
-  public synchronized void add(final ExecutableTask executableTask) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
+  public synchronized void add(final Task task) {
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
 
     stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
       if (taskIdToTask == null) {
-        final Map<String, ExecutableTask> taskIdToTaskMap = new HashMap<>();
-        taskIdToTaskMap.put(executableTask.getTaskId(), executableTask);
-        updateSchedulableStages(stageId, executableTask.getContainerType());
+        final Map<String, Task> taskIdToTaskMap = new HashMap<>();
+        taskIdToTaskMap.put(task.getTaskId(), task);
+        updateSchedulableStages(stageId, task.getContainerType());
         return taskIdToTaskMap;
       } else {
-        taskIdToTask.put(executableTask.getTaskId(), executableTask);
+        taskIdToTask.put(task.getTaskId(), task);
         return taskIdToTask;
       }
     });
@@ -81,18 +81,18 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    *                                (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
    */
   @Override
-  public synchronized ExecutableTask remove(final String taskId) throws NoSuchElementException {
+  public synchronized Task remove(final String taskId) throws NoSuchElementException {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       throw new NoSuchElementException("No schedulable stage in Task queue");
     }
 
-    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
 
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
     }
-    final ExecutableTask taskToSchedule = pendingTasksForStage.remove(taskId);
+    final Task taskToSchedule = pendingTasksForStage.remove(taskId);
     if (taskToSchedule == null) {
       throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
     }
@@ -115,13 +115,13 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
    *         or {@link Optional#empty} if the queue is empty
    */
   @Override
-  public synchronized Optional<Collection<ExecutableTask>> peekSchedulableStage() {
+  public synchronized Optional<Collection<Task>> peekSchedulableStage() {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       return Optional.empty();
     }
 
-    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index bee0a36..35dbb07 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -59,15 +59,15 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
    *                               If there is no source locations, will return original set.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     final Set<String> sourceLocations;
     try {
-      sourceLocations = getSourceLocations(executableTask.getIrVertexIdToReadable().values());
+      sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
       return executorRepresenterSet;
     } catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 1b7768f..55808c9 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link ContainerTypeAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class ContainerTypeAwareSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -48,24 +48,24 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
-    final ExecutableTask executableTask1 = mock(ExecutableTask.class);
-    when(executableTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    final Task task1 = mock(Task.class);
+    when(task1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, executableTask1);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, task1);
 
     final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors1, candidateExecutors1);
 
-    final ExecutableTask executableTask2 = mock(ExecutableTask.class);
-    when(executableTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    final Task task2 = mock(Task.class);
+    when(task2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, executableTask2);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, task2);
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
index ac77c46..c60eeb3 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link FreeSlotSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class FreeSlotSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
@@ -51,12 +51,12 @@ public final class FreeSlotSchedulingPolicyTest {
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
-    final ExecutableTask executableTask = mock(ExecutableTask.class);
+    final Task task = mock(Task.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index d40859b..57aa8cb 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class RoundRobinSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
@@ -50,12 +50,12 @@ public final class RoundRobinSchedulingPolicyTest {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    final ExecutableTask executableTask = mock(ExecutableTask.class);
+    final Task task = mock(Task.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 921dfe8..852ebbc 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
@@ -85,7 +85,7 @@ public final class SingleTaskQueueTest {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ExecutableTask dequeuedTask = dequeue();
+        final Task dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(1).getId());
 
@@ -137,7 +137,7 @@ public final class SingleTaskQueueTest {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ExecutableTask dequeuedTask = dequeue();
+        final Task dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(0).getId());
 
@@ -213,7 +213,7 @@ public final class SingleTaskQueueTest {
    */
   private void scheduleStage(final Stage stage) {
     stage.getTaskIds().forEach(taskId ->
-        pendingTaskPriorityQueue.add(new ExecutableTask(
+        pendingTaskPriorityQueue.add(new Task(
             "TestPlan",
             taskId,
             0,
@@ -229,17 +229,17 @@ public final class SingleTaskQueueTest {
    * @return the stage name of the dequeued task.
    */
   private String dequeueAndGetStageId() {
-    final ExecutableTask executableTask = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
+    final Task task = dequeue();
+    return RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
   }
 
   /**
    * Dequeues a scheduled task from the task priority queue.
    * @return the Task dequeued
    */
-  private ExecutableTask dequeue() {
-    final Collection<ExecutableTask> executableTasks
+  private Task dequeue() {
+    final Collection<Task> tasks
         = pendingTaskPriorityQueue.peekSchedulableStage().get();
-    return pendingTaskPriorityQueue.remove(executableTasks.iterator().next().getTaskId());
+    return pendingTaskPriorityQueue.remove(tasks.iterator().next().getTaskId());
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 4263e7f..2f538ee 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
  * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ExecutableTask} when
+   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link Task} when
    * there are no executors in appropriate location(s).
    */
   @Test
@@ -54,7 +54,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
 
     // Prepare test scenario
-    final ExecutableTask task = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)));
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
@@ -70,19 +70,19 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   public void testSourceLocationAwareSchedulingWithMultiSource() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
-    final ExecutableTask task0 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task0 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
-    final ExecutableTask task1 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task1 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
-    final ExecutableTask task2 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task2 = CreateTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
             Arrays.asList(SITE_1, SITE_2)));
-    final ExecutableTask task3 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task3 = CreateTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
             Arrays.asList(SITE_0, SITE_2)));
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
-    for (final ExecutableTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
+    for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
       assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
           new HashSet<>(Collections.singletonList(e)), task));
     }
@@ -90,14 +90,14 @@ public final class SourceLocationAwareSchedulingPolicyTest {
 
 
   /**
-   * Utility for creating {@link ExecutableTask}.
+   * Utility for creating {@link Task}.
    */
-  private static final class CreateExecutableTask {
+  private static final class CreateTask {
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
     private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
 
-    private static ExecutableTask doCreate(final Collection<Readable> readables) {
-      final ExecutableTask mockInstance = mock(ExecutableTask.class);
+    private static Task doCreate(final Collection<Readable> readables) {
+      final Task mockInstance = mock(Task.class);
       final Map<String, Readable> readableMap = new HashMap<>();
       readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
           readable));
@@ -106,7 +106,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       return mockInstance;
     }
 
-    static ExecutableTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+    static Task withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables) {
+    static Task withReadablesWithoutSourceLocations(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
+    static Task withReadablesWhichThrowException(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ExecutableTask withoutReadables() {
+    static Task withoutReadables() {
       return doCreate(Collections.emptyList());
     }
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index a946b4b..ffbfef6 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -27,7 +27,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
@@ -116,8 +116,8 @@ public final class TaskExecutorTest {
     final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
-    final ExecutableTask executableTask =
-        new ExecutableTask(
+    final Task task =
+        new Task(
             "testSourceVertex",
             taskId,
             0,
@@ -129,7 +129,7 @@ public final class TaskExecutorTest {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.
@@ -170,8 +170,8 @@ public final class TaskExecutorTest {
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
     final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
-    final ExecutableTask executableTask =
-        new ExecutableTask(
+    final Task task =
+        new Task(
             "testSourceVertex",
             taskId,
             0,
@@ -183,7 +183,7 @@ public final class TaskExecutorTest {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.