You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/05 09:39:05 UTC

[GitHub] jeongyooneo closed pull request #29: [NEMO-95] Rename ExecutableTask to Task

jeongyooneo closed pull request #29: [NEMO-95] Rename ExecutableTask to Task
URL: https://github.com/apache/incubator-nemo/pull/29
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
similarity index 72%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 1da8f978..5cd8b165 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -23,9 +23,9 @@
 import java.util.Map;
 
 /**
- * An ExecutableTask is a self-contained executable that can be executed in specific types of containers.
+ * A Task is a self-contained executable that can be executed on a machine.
  */
-public final class ExecutableTask implements Serializable {
+public final class Task implements Serializable {
   private final String jobId;
   private final String taskId;
   private final int taskIdx;
@@ -33,35 +33,35 @@
   private final List<StageEdge> taskOutgoingEdges;
   private final int attemptIdx;
   private final String containerType;
-  private final byte[] serializedTaskDag;
+  private final byte[] serializedIRDag;
   private final Map<String, Readable> irVertexIdToReadable;
 
   /**
    * Constructor.
    *
    * @param jobId                the id of the job.
-   * @param taskId               the ID of the scheduled task.
+   * @param taskId               the ID of the task.
    * @param attemptIdx           the attempt index.
    * @param containerType        the type of container to execute the task on.
    * @param serializedIRDag      the serialized DAG of the task.
    * @param taskIncomingEdges    the incoming edges of the task.
    * @param taskOutgoingEdges    the outgoing edges of the task.
-   * @param irVertexIdToReadable the map between logical task ID and readable.
+   * @param irVertexIdToReadable the map between IRVertex id to readable.
    */
-  public ExecutableTask(final String jobId,
-                        final String taskId,
-                        final int attemptIdx,
-                        final String containerType,
-                        final byte[] serializedIRDag,
-                        final List<StageEdge> taskIncomingEdges,
-                        final List<StageEdge> taskOutgoingEdges,
-                        final Map<String, Readable> irVertexIdToReadable) {
+  public Task(final String jobId,
+              final String taskId,
+              final int attemptIdx,
+              final String containerType,
+              final byte[] serializedIRDag,
+              final List<StageEdge> taskIncomingEdges,
+              final List<StageEdge> taskOutgoingEdges,
+              final Map<String, Readable> irVertexIdToReadable) {
     this.jobId = jobId;
     this.taskId = taskId;
     this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
     this.attemptIdx = attemptIdx;
     this.containerType = containerType;
-    this.serializedTaskDag = serializedIRDag;
+    this.serializedIRDag = serializedIRDag;
     this.taskIncomingEdges = taskIncomingEdges;
     this.taskOutgoingEdges = taskOutgoingEdges;
     this.irVertexIdToReadable = irVertexIdToReadable;
@@ -75,21 +75,21 @@ public String getJobId() {
   }
 
   /**
-   * @return the serialized DAG of the task.
+   * @return the serialized IR DAG of the task.
    */
   public byte[] getSerializedIRDag() {
-    return serializedTaskDag;
+    return serializedIRDag;
   }
 
   /**
-   * @return the ID of the scheduled task.
+   * @return the ID of the task.
    */
   public String getTaskId() {
     return taskId;
   }
 
   /**
-   * @return the idx of the scheduled task.
+   * @return the idx of the task.
    */
   public int getTaskIdx() {
     return taskIdx;
@@ -124,7 +124,7 @@ public String getContainerType() {
   }
 
   /**
-   * @return the map between logical task ID and readable.
+   * @return the map between IRVertex id and readable.
    */
   public Map<String, Readable> getIrVertexIdToReadable() {
     return irVertexIdToReadable;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 88729909..287ca010 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -28,7 +28,7 @@
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import org.apache.commons.lang3.SerializationUtils;
@@ -87,26 +87,26 @@ public String getExecutorId() {
     return executorId;
   }
 
-  private synchronized void onTaskReceived(final ExecutableTask executableTask) {
+  private synchronized void onTaskReceived(final Task task) {
     LOG.debug("Executor [{}] received Task [{}] to execute.",
-        new Object[]{executorId, executableTask.getTaskId()});
-    executorService.execute(() -> launchTask(executableTask));
+        new Object[]{executorId, task.getTaskId()});
+    executorService.execute(() -> launchTask(task));
   }
 
   /**
    * Launches the Task, and keeps track of the execution state with taskStateManager.
-   * @param executableTask to launch.
+   * @param task to launch.
    */
-  private void launchTask(final ExecutableTask executableTask) {
+  private void launchTask(final Task task) {
     try {
       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
-          SerializationUtils.deserialize(executableTask.getSerializedIRDag());
+          SerializationUtils.deserialize(task.getSerializedIRDag());
       final TaskStateManager taskStateManager =
-          new TaskStateManager(executableTask, executorId, persistentConnectionToMasterMap, metricMessageSender);
+          new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
-      executableTask.getTaskIncomingEdges()
+      task.getTaskIncomingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      executableTask.getTaskOutgoingEdges()
+      task.getTaskOutgoingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v)
@@ -114,7 +114,7 @@ private void launchTask(final ExecutableTask executableTask) {
       });
 
       new TaskExecutor(
-          executableTask, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+          task, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
@@ -149,9 +149,9 @@ public void onMessage(final ControlMessage.Message message) {
       switch (message.getType()) {
       case ScheduleTask:
         final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
-        final ExecutableTask executableTask =
+        final Task task =
             SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
-        onTaskReceived(executableTask);
+        onTaskReceived(task);
         break;
       default:
         throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index 09a43cc2..4a9b1eeb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -23,7 +23,7 @@
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.state.TaskState;
@@ -46,7 +46,7 @@
   private static final String ITERATORID_PREFIX = "ITERATOR_";
   private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
 
-  // From ExecutableTask
+  // From Task
   private final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag;
   private final String taskId;
   private final int taskIdx;
@@ -81,24 +81,24 @@
 
   /**
    * Constructor.
-   * @param executableTask Task with information needed during execution.
+   * @param task Task with information needed during execution.
    * @param irVertexDag A DAG of vertices.
    * @param taskStateManager State manager for this Task.
    * @param channelFactory For reading from/writing to data to other Stages.
    * @param metricMessageSender For sending metric with execution stats to Master.
    */
-  public TaskExecutor(final ExecutableTask executableTask,
+  public TaskExecutor(final Task task,
                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
                       final DataTransferFactory channelFactory,
                       final MetricMessageSender metricMessageSender) {
-    // Information from the ExecutableTask.
+    // Information from the Task.
     this.irVertexDag = irVertexDag;
-    this.taskId = executableTask.getTaskId();
-    this.taskIdx = executableTask.getTaskIdx();
-    this.stageIncomingEdges = executableTask.getTaskIncomingEdges();
-    this.stageOutgoingEdges = executableTask.getTaskOutgoingEdges();
-    this.irVertexIdToReadable = executableTask.getIrVertexIdToReadable();
+    this.taskId = task.getTaskId();
+    this.taskIdx = task.getTaskIdx();
+    this.stageIncomingEdges = task.getTaskIncomingEdges();
+    this.stageOutgoingEdges = task.getTaskOutgoingEdges();
+    this.irVertexIdToReadable = task.getIrVertexIdToReadable();
 
     // Other parameters.
     this.taskStateManager = taskStateManager;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 7b1bc52f..c06ab5a5 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 
 import java.util.*;
 
@@ -44,12 +44,12 @@
   private final MetricCollector metricCollector;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
-  public TaskStateManager(final ExecutableTask executableTask,
+  public TaskStateManager(final Task task,
                           final String executorId,
                           final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                           final MetricMessageSender metricMessageSender) {
-    this.taskId = executableTask.getTaskId();
-    this.attemptIdx = executableTask.getAttemptIdx();
+    this.taskId = task.getTaskId();
+    this.attemptIdx = task.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.metricCollector = new MetricCollector(metricMessageSender);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index f73ecdb1..62ab5c68 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
@@ -95,17 +95,17 @@ public ExecutorRepresenter(final String executorId,
 
   /**
    * Marks the Task as running, and sends scheduling message to the executor.
-   * @param executableTask
+   * @param task
    */
-  public void onTaskScheduled(final ExecutableTask executableTask) {
-    runningTasks.add(executableTask.getTaskId());
-    runningTaskToAttempt.put(executableTask.getTaskId(), executableTask.getAttemptIdx());
-    failedTasks.remove(executableTask.getTaskId());
+  public void onTaskScheduled(final Task task) {
+    runningTasks.add(task.getTaskId());
+    runningTaskToAttempt.put(task.getTaskId(), task.getAttemptIdx());
+    failedTasks.remove(task.getTaskId());
 
     serializationExecutorService.submit(new Runnable() {
       @Override
       public void run() {
-        final byte[] serialized = SerializationUtils.serialize(executableTask);
+        final byte[] serialized = SerializationUtils.serialize(task);
         sendControlMessage(
             ControlMessage.Message.newBuilder()
                 .setId(RuntimeIdGenerator.generateMessageId())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index c51d1ec3..273f1084 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -400,7 +400,7 @@ private void scheduleStage(final Stage stageToSchedule) {
       final int attemptIdx = jobStateManager.getCurrentAttemptIndexForTask(taskId);
 
       LOG.debug("Enqueueing {}", taskId);
-      pendingTaskCollection.add(new ExecutableTask(
+      pendingTaskCollection.add(new Task(
           physicalPlan.getId(),
           taskId,
           attemptIdx,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index 66fa9869..e15e401b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -46,10 +46,10 @@ private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy sour
 
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     Set<ExecutorRepresenter> candidates = executorRepresenterSet;
     for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, executableTask);
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, task);
     }
     return candidates;
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index 46b3c6b1..b8f7d745 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -37,20 +37,20 @@ public ContainerTypeAwareSchedulingPolicy() {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
    *                               If the container type of target Task is NONE, it will return the original set.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
 
-    if (executableTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    if (task.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(executableTask.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(task.getContainerType()))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 5d19f52d..d92d9d46 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -35,12 +35,12 @@ public FreeSlotSchedulingPolicy() {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
    *                               Executors that do not have any free slots will be filtered by this policy.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
             .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 1d8c8a2e..8d954e38 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -38,9 +38,9 @@
 
   /**
    * Adds a Task to this collection.
-   * @param executableTask to add.
+   * @param task to add.
    */
-  void add(final ExecutableTask executableTask);
+  void add(final Task task);
 
   /**
    * Removes the specified Task to be scheduled.
@@ -49,14 +49,14 @@
    * @throws NoSuchElementException if the specified Task is not in the queue,
    *                                or removing this Task breaks scheduling order
    */
-  ExecutableTask remove(final String taskId) throws NoSuchElementException;
+  Task remove(final String taskId) throws NoSuchElementException;
 
   /**
    * Peeks stage that can be scheduled according to job dependency priority.
    * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
    * @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
    */
-  Optional<Collection<ExecutableTask>> peekSchedulableStage();
+  Optional<Collection<Task>> peekSchedulableStage();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index afdc6718..5c6e65c3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -47,12 +47,12 @@ public RoundRobinSchedulingPolicy() {
 
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     final OptionalInt minOccupancy =
         executorRepresenterSet.stream()
         .map(executor -> executor.getRunningTasks().size())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index aa537808..a329c4b6 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +112,7 @@ void terminate() {
   }
 
   void doScheduleStage() {
-    final Collection<ExecutableTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
+    final Collection<Task> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
     if (stageToSchedule == null) {
       // Task queue is empty
       LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
@@ -120,27 +120,27 @@ void doScheduleStage() {
     }
 
     final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
-    for (final ExecutableTask schedulableTask : stageToSchedule) {
-      final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
-      LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
+    for (final Task task : stageToSchedule) {
+      final JobStateManager jobStateManager = jobStateManagers.get(task.getJobId());
+      LOG.debug("Trying to schedule {}...", task.getTaskId());
 
       executorRegistry.viewExecutors(executors -> {
         final Set<ExecutorRepresenter> candidateExecutors =
-            schedulingPolicy.filterExecutorRepresenters(executors, schedulableTask);
+            schedulingPolicy.filterExecutorRepresenters(executors, task);
         final Optional<ExecutorRepresenter> firstCandidate = candidateExecutors.stream().findFirst();
 
         if (firstCandidate.isPresent()) {
           // update metadata first
-          jobStateManager.onTaskStateChanged(schedulableTask.getTaskId(), TaskState.State.EXECUTING);
-          pendingTaskCollection.remove(schedulableTask.getTaskId());
+          jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
+          pendingTaskCollection.remove(task.getTaskId());
           numScheduledTasks.incrementAndGet();
 
           // send the task
           final ExecutorRepresenter selectedExecutor = firstCandidate.get();
-          selectedExecutor.onTaskScheduled(schedulableTask);
-          LOG.debug("Successfully scheduled {}", schedulableTask.getTaskId());
+          selectedExecutor.onTaskScheduled(task);
+          LOG.debug("Successfully scheduled {}", task.getTaskId());
         } else {
-          LOG.debug("Failed to schedule {}", schedulableTask.getTaskId());
+          LOG.debug("Failed to schedule {}", task.getTaskId());
         }
       });
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 7ea05fbc..0064310c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@
 @DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
   Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final ExecutableTask executableTask);
+                                                      final Task task);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index fae45657..b1318b16 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -41,7 +41,7 @@
   /**
    * Pending Tasks awaiting to be scheduled for each stage.
    */
-  private final ConcurrentMap<String, Map<String, ExecutableTask>> stageIdToPendingTasks;
+  private final ConcurrentMap<String, Map<String, Task>> stageIdToPendingTasks;
 
   /**
    * Stages with Tasks that have not yet been scheduled.
@@ -55,17 +55,17 @@ public SingleJobTaskCollection() {
   }
 
   @Override
-  public synchronized void add(final ExecutableTask executableTask) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
+  public synchronized void add(final Task task) {
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
 
     stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
       if (taskIdToTask == null) {
-        final Map<String, ExecutableTask> taskIdToTaskMap = new HashMap<>();
-        taskIdToTaskMap.put(executableTask.getTaskId(), executableTask);
-        updateSchedulableStages(stageId, executableTask.getContainerType());
+        final Map<String, Task> taskIdToTaskMap = new HashMap<>();
+        taskIdToTaskMap.put(task.getTaskId(), task);
+        updateSchedulableStages(stageId, task.getContainerType());
         return taskIdToTaskMap;
       } else {
-        taskIdToTask.put(executableTask.getTaskId(), executableTask);
+        taskIdToTask.put(task.getTaskId(), task);
         return taskIdToTask;
       }
     });
@@ -81,18 +81,18 @@ public synchronized void add(final ExecutableTask executableTask) {
    *                                (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
    */
   @Override
-  public synchronized ExecutableTask remove(final String taskId) throws NoSuchElementException {
+  public synchronized Task remove(final String taskId) throws NoSuchElementException {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       throw new NoSuchElementException("No schedulable stage in Task queue");
     }
 
-    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
 
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
     }
-    final ExecutableTask taskToSchedule = pendingTasksForStage.remove(taskId);
+    final Task taskToSchedule = pendingTasksForStage.remove(taskId);
     if (taskToSchedule == null) {
       throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
     }
@@ -115,13 +115,13 @@ public synchronized ExecutableTask remove(final String taskId) throws NoSuchElem
    *         or {@link Optional#empty} if the queue is empty
    */
   @Override
-  public synchronized Optional<Collection<ExecutableTask>> peekSchedulableStage() {
+  public synchronized Optional<Collection<Task>> peekSchedulableStage() {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       return Optional.empty();
     }
 
-    final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
     if (pendingTasksForStage == null) {
       throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index bee0a365..35dbb07f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -59,15 +59,15 @@ public SourceLocationAwareSchedulingPolicy() {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
    *                               If there is no source locations, will return original set.
-   * @param executableTask {@link ExecutableTask} to be scheduled.
+   * @param task {@link Task} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ExecutableTask executableTask) {
+                                                             final Task task) {
     final Set<String> sourceLocations;
     try {
-      sourceLocations = getSourceLocations(executableTask.getIrVertexIdToReadable().values());
+      sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
       return executorRepresenterSet;
     } catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 1b7768f9..55808c9b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@
  * Tests {@link ContainerTypeAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class ContainerTypeAwareSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -48,24 +48,24 @@ public void testContainerTypeAware() {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
-    final ExecutableTask executableTask1 = mock(ExecutableTask.class);
-    when(executableTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    final Task task1 = mock(Task.class);
+    when(task1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, executableTask1);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, task1);
 
     final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors1, candidateExecutors1);
 
-    final ExecutableTask executableTask2 = mock(ExecutableTask.class);
-    when(executableTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    final Task task2 = mock(Task.class);
+    when(task2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, executableTask2);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, task2);
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
index ac77c46c..c60eeb3a 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@
  * Tests {@link FreeSlotSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class FreeSlotSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
@@ -51,12 +51,12 @@ public void testFreeSlot() {
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
-    final ExecutableTask executableTask = mock(ExecutableTask.class);
+    final Task task = mock(Task.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index d40859bc..57aa8cbe 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class RoundRobinSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
@@ -50,12 +50,12 @@ public void testRoundRobin() {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    final ExecutableTask executableTask = mock(ExecutableTask.class);
+    final Task task = mock(Task.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 921dfe8c..852ebbcd 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
@@ -85,7 +85,7 @@ public void testPushPriority() throws Exception {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ExecutableTask dequeuedTask = dequeue();
+        final Task dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(1).getId());
 
@@ -137,7 +137,7 @@ public void testPullPriority() throws Exception {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ExecutableTask dequeuedTask = dequeue();
+        final Task dequeuedTask = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(0).getId());
 
@@ -213,7 +213,7 @@ public void testWithDifferentContainerType() throws Exception {
    */
   private void scheduleStage(final Stage stage) {
     stage.getTaskIds().forEach(taskId ->
-        pendingTaskPriorityQueue.add(new ExecutableTask(
+        pendingTaskPriorityQueue.add(new Task(
             "TestPlan",
             taskId,
             0,
@@ -229,17 +229,17 @@ private void scheduleStage(final Stage stage) {
    * @return the stage name of the dequeued task.
    */
   private String dequeueAndGetStageId() {
-    final ExecutableTask executableTask = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
+    final Task task = dequeue();
+    return RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
   }
 
   /**
    * Dequeues a scheduled task from the task priority queue.
    * @return the Task dequeued
    */
-  private ExecutableTask dequeue() {
-    final Collection<ExecutableTask> executableTasks
+  private Task dequeue() {
+    final Collection<Task> tasks
         = pendingTaskPriorityQueue.peekSchedulableStage().get();
-    return pendingTaskPriorityQueue.remove(executableTasks.iterator().next().getTaskId());
+    return pendingTaskPriorityQueue.remove(tasks.iterator().next().getTaskId());
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 4263e7f2..2f538ee9 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -33,7 +33,7 @@
  * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ private static ExecutorRepresenter mockExecutorRepresenter(final String executor
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ExecutableTask} when
+   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link Task} when
    * there are no executors in appropriate location(s).
    */
   @Test
@@ -54,7 +54,7 @@ public void testSourceLocationAwareSchedulingNotAvailable() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
 
     // Prepare test scenario
-    final ExecutableTask task = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)));
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
@@ -70,19 +70,19 @@ public void testSourceLocationAwareSchedulingNotAvailable() {
   public void testSourceLocationAwareSchedulingWithMultiSource() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
-    final ExecutableTask task0 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task0 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
-    final ExecutableTask task1 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task1 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
-    final ExecutableTask task2 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task2 = CreateTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
             Arrays.asList(SITE_1, SITE_2)));
-    final ExecutableTask task3 = CreateExecutableTask.withReadablesWithSourceLocations(
+    final Task task3 = CreateTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
             Arrays.asList(SITE_0, SITE_2)));
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
-    for (final ExecutableTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
+    for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
       assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
           new HashSet<>(Collections.singletonList(e)), task));
     }
@@ -90,14 +90,14 @@ public void testSourceLocationAwareSchedulingWithMultiSource() {
 
 
   /**
-   * Utility for creating {@link ExecutableTask}.
+   * Utility for creating {@link Task}.
    */
-  private static final class CreateExecutableTask {
+  private static final class CreateTask {
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
     private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
 
-    private static ExecutableTask doCreate(final Collection<Readable> readables) {
-      final ExecutableTask mockInstance = mock(ExecutableTask.class);
+    private static Task doCreate(final Collection<Readable> readables) {
+      final Task mockInstance = mock(Task.class);
       final Map<String, Readable> readableMap = new HashMap<>();
       readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
           readable));
@@ -106,7 +106,7 @@ private static ExecutableTask doCreate(final Collection<Readable> readables) {
       return mockInstance;
     }
 
-    static ExecutableTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+    static Task withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ static ExecutableTask withReadablesWithSourceLocations(final Collection<List<Str
       }
     }
 
-    static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables) {
+    static Task withReadablesWithoutSourceLocations(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables
       }
     }
 
-    static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
+    static Task withReadablesWhichThrowException(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
       }
     }
 
-    static ExecutableTask withoutReadables() {
+    static Task withoutReadables() {
       return doCreate(Collections.emptyList());
     }
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index a946b4bb..ffbfef68 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -27,7 +27,7 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
@@ -116,8 +116,8 @@ public Iterable read() throws Exception {
     final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
     final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
-    final ExecutableTask executableTask =
-        new ExecutableTask(
+    final Task task =
+        new Task(
             "testSourceVertex",
             taskId,
             0,
@@ -129,7 +129,7 @@ public Iterable read() throws Exception {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.
@@ -170,8 +170,8 @@ public void testOperatorVertex() throws Exception {
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
     final StageEdge stageOutEdge = mock(StageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
-    final ExecutableTask executableTask =
-        new ExecutableTask(
+    final Task task =
+        new Task(
             "testSourceVertex",
             taskId,
             0,
@@ -183,7 +183,7 @@ public void testOperatorVertex() throws Exception {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
     taskExecutor.execute();
 
     // Check the output.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services