You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/06/26 06:02:36 UTC

[incubator-nemo] branch master updated: [NEMO-123] Replace PendingTaskCollection with pointers to ScheduleGroups (#53)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new aa3bf8b  [NEMO-123] Replace PendingTaskCollection with pointers to ScheduleGroups (#53)
aa3bf8b is described below

commit aa3bf8b943fdc55fe45f925b4ffe411aa42f5e20
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 26 15:02:34 2018 +0900

    [NEMO-123] Replace PendingTaskCollection with pointers to ScheduleGroups (#53)
    
    JIRA: [NEMO-123: Replace PendingTaskCollection with pointers to ScheduleGroups](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-123)
    
    **Major changes:**
    - Replaces PendingTaskCollection with PendingTaskCollectionPointer
    
    **Minor changes to note:**
    - Implements some toString() methods
    - Changes the overall scheduler code to use PendingTaskCollectionPointer properly
    
    **Tests for the changes:**
    - PendingTaskCollectionPointerTest
    
    **Other comments:**
    - Comments out encoding/decoding passes in SailfishPolicy and adds a pointer to https://issues.apache.org/jira/projects/NEMO/issues/NEMO-125
    
    resolves [NEMO-123](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-123)
---
 .../pass/compiletime/composite/SailfishPass.java   |   5 +-
 .../edu/snu/nemo/runtime/common/plan/Task.java     |  17 ++
 .../executor/data/metadata/PartitionMetadata.java  |  14 ++
 .../executor/datatransfer/DataTransferTest.java    |   2 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |   3 -
 .../master/scheduler/BatchSingleJobScheduler.java  | 218 +++++++++---------
 .../master/scheduler/PendingTaskCollection.java    |  85 -------
 .../scheduler/PendingTaskCollectionPointer.java    |  66 ++++++
 .../nemo/runtime/master/scheduler/Scheduler.java   |   2 +-
 .../runtime/master/scheduler/SchedulerRunner.java  | 174 +++++++--------
 .../master/scheduler/SingleJobTaskCollection.java  | 209 ------------------
 .../scheduler/BatchSingleJobSchedulerTest.java     |   8 +-
 .../master/scheduler/FaultToleranceTest.java       |  36 ++-
 .../PendingTaskCollectionPointerTest.java          |  78 +++++++
 .../master/scheduler/SchedulerTestUtil.java        |  15 +-
 .../master/scheduler/SingleTaskQueueTest.java      | 245 ---------------------
 .../compiletime/composite/SailfishPassTest.java    |  10 +-
 17 files changed, 395 insertions(+), 792 deletions(-)

diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index d5c50e4..ab1ece1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -32,8 +32,9 @@ public final class SailfishPass extends CompositePass {
         new SailfishRelayReshapingPass(),
         new SailfishEdgeDataFlowModelPass(),
         new SailfishEdgeDataStorePass(),
-        new SailfishEdgeDecoderPass(),
-        new SailfishEdgeEncoderPass(),
+        // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
+        // new SailfishEdgeDecoderPass(),
+        // new SailfishEdgeEncoderPass(),
         new SailfishEdgeUsedDataHandlingPass()
     ));
   }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 21ea23a..ce0a4ff 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.runtime.common.plan;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import org.apache.commons.lang.SerializationUtils;
 
 import java.io.Serializable;
 import java.util.List;
@@ -134,4 +135,20 @@ public final class Task implements Serializable {
   public Map<String, Readable> getIrVertexIdToReadable() {
     return irVertexIdToReadable;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("jobId: ");
+    sb.append(jobId);
+    sb.append(" / taskId: ");
+    sb.append(taskId);
+    sb.append(" / attempt: ");
+    sb.append(attemptIdx);
+    sb.append(" / irDAG: ");
+    sb.append(SerializationUtils.deserialize(serializedIRDag));
+    sb.append("/ exec props: ");
+    sb.append(getExecutionProperties());
+    return sb.toString();
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
index 93b2c83..9d68e5a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
@@ -72,4 +72,18 @@ public final class PartitionMetadata<K extends Serializable> {
   public long getElementsTotal() {
     return elementsTotal;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("key: ");
+    sb.append(key);
+    sb.append("/ partitionSize: ");
+    sb.append(partitionSize);
+    sb.append("/ offset: ");
+    sb.append(offset);
+    sb.append("/ elementsTotal: ");
+    sb.append(elementsTotal);
+    return sb.toString();
+  }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 88dda9d..73f531c 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -131,7 +131,7 @@ public final class DataTransferTest {
     final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     final SchedulingPolicy schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    final PendingTaskCollection taskQueue = new SingleJobTaskCollection();
+    final PendingTaskCollectionPointer taskQueue = new PendingTaskCollectionPointer();
     final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
     final Scheduler scheduler = new BatchSingleJobScheduler(
         schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 9c24671..1da2e72 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -176,9 +176,6 @@ public final class BlockManagerMaster {
 
   /**
    * To be called when a potential producer task is scheduled.
-   * To be precise, it is called when the task is enqueued to
-   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskCollection}.
-   *
    * @param scheduledTaskId the ID of the scheduled task.
    */
   public void onProducerTaskScheduled(final String scheduledTaskId) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 3506063..4bd8d4f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -46,7 +46,7 @@ import static edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
 import static edu.snu.nemo.runtime.common.state.TaskState.State.READY;
 
 /**
- * (WARNING) Only a single dedicated thread should use the public methods of this class.
+ * (CONCURRENCY) Only a single dedicated thread should use the public methods of this class.
  * (i.e., runtimeMasterThread in RuntimeMaster)
  *
  * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
@@ -60,7 +60,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * Components related to scheduling the given job.
    */
   private final SchedulerRunner schedulerRunner;
-  private final PendingTaskCollection pendingTaskCollection;
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorRegistry executorRegistry;
 
   /**
@@ -78,13 +78,13 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   @Inject
   public BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
-                                 final PendingTaskCollection pendingTaskCollection,
+                                 final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
                                  final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
                                  final ExecutorRegistry executorRegistry) {
     this.schedulerRunner = schedulerRunner;
-    this.pendingTaskCollection = pendingTaskCollection;
+    this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.blockManagerMaster = blockManagerMaster;
     this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
     updatePhysicalPlanEventHandler.setScheduler(this);
@@ -96,26 +96,28 @@ public final class BatchSingleJobScheduler implements Scheduler {
   }
 
   /**
-   * Receives a job to schedule.
-   * @param jobToSchedule the physical plan for the job.
-   * @param scheduledJobStateManager to keep track of the submitted job's states.
+   * @param physicalPlanOfJob of the job.
+   * @param jobStateManagerOfJob of the job.
    */
   @Override
-  public void scheduleJob(final PhysicalPlan jobToSchedule, final JobStateManager scheduledJobStateManager) {
-    this.physicalPlan = jobToSchedule;
-    this.jobStateManager = scheduledJobStateManager;
+  public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final JobStateManager jobStateManagerOfJob) {
+    if (this.physicalPlan != null || this.jobStateManager != null) {
+      throw new IllegalStateException("scheduleJob() has been called more than once");
+    }
+
+    this.physicalPlan = physicalPlanOfJob;
+    this.jobStateManager = jobStateManagerOfJob;
 
-    schedulerRunner.scheduleJob(scheduledJobStateManager);
+    schedulerRunner.scheduleJob(jobStateManagerOfJob);
     schedulerRunner.runSchedulerThread();
-    pendingTaskCollection.onJobScheduled(physicalPlan);
 
-    LOG.info("Job to schedule: {}", jobToSchedule.getId());
+    LOG.info("Job to schedule: {}", physicalPlanOfJob.getId());
 
-    this.initialScheduleGroup = jobToSchedule.getStageDAG().getVertices().stream()
+    this.initialScheduleGroup = physicalPlanOfJob.getStageDAG().getVertices().stream()
         .mapToInt(stage -> stage.getScheduleGroupIndex())
         .min().getAsInt();
 
-    scheduleRootStages();
+    scheduleNextScheduleGroup(initialScheduleGroup);
   }
 
   @Override
@@ -211,7 +213,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
       // Schedule a stage after marking the necessary tasks to failed_recoverable.
       // The stage for one of the tasks that failed is a starting point to look
       // for the next stage to be scheduled.
-      scheduleNextStage(RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next()));
+      scheduleNextScheduleGroup(getSchedulingIndexOfStage(
+          RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next())));
     }
   }
 
@@ -219,133 +222,115 @@ public final class BatchSingleJobScheduler implements Scheduler {
   public void terminate() {
     this.schedulerRunner.terminate();
     this.executorRegistry.terminate();
-    this.pendingTaskCollection.close();
-  }
-
-  /**
-   * Schedule stages in initial schedule group, in reverse-topological order.
-   */
-  private void scheduleRootStages() {
-    final List<Stage> rootStages =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
-            stage.getScheduleGroupIndex() == initialScheduleGroup)
-            .collect(Collectors.toList());
-    Collections.reverse(rootStages);
-    rootStages.forEach(this::scheduleStage);
   }
 
   /**
-   * Schedules the next stage to execute after a stage completion.
-   * @param completedStageId the ID of the stage that just completed and triggered this scheduling.
+   * Schedules the next schedule group to execute.
+   * @param referenceIndex of the schedule group.
    */
-  private void scheduleNextStage(final String completedStageId) {
-    final Stage completeOrFailedStage = getStageById(completedStageId);
-    final Optional<List<Stage>> nextStagesToSchedule =
-        selectNextStagesToSchedule(completeOrFailedStage.getScheduleGroupIndex());
-
-    if (nextStagesToSchedule.isPresent()) {
-      LOG.info("Scheduling: ScheduleGroup {}", nextStagesToSchedule.get().get(0).getScheduleGroupIndex());
-
-      nextStagesToSchedule.get().forEach(this::scheduleStage);
+  private void scheduleNextScheduleGroup(final int referenceIndex) {
+    final Optional<List<Stage>> nextScheduleGroupToSchedule = selectNextScheduleGroupToSchedule(referenceIndex);
+
+    if (nextScheduleGroupToSchedule.isPresent()) {
+      LOG.info("Scheduling: ScheduleGroup {}", nextScheduleGroupToSchedule.get());
+      final List<Task> tasksToSchedule = nextScheduleGroupToSchedule.get().stream()
+          .flatMap(stage -> getSchedulableTasks(stage).stream())
+          .collect(Collectors.toList());
+      pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+      schedulerRunner.onNewPendingTaskCollectionAvailable();
     } else {
       LOG.info("Skipping this round as the next schedulable stages have already been scheduled.");
     }
   }
 
   /**
-   * Selects the list of stages to schedule, in the order they must be added to {@link PendingTaskCollection}.
+   * Selects the next stage to schedule.
+   * It takes the referenceScheduleGroupIndex as a reference point to begin looking for the stages to execute:
    *
-   * This is a recursive function that decides which schedule group to schedule upon a stage completion, or a failure.
-   * It takes the currentScheduleGroupIndex as a reference point to begin looking for the stages to execute:
    * a) returns the failed_recoverable stage(s) of the earliest schedule group, if it(they) exists.
    * b) returns an empty optional if there are no schedulable stages at the moment.
    *    - if the current schedule group is still executing
    *    - if an ancestor schedule group is still executing
    * c) returns the next set of schedulable stages (if the current schedule group has completed execution)
    *
-   * The current implementation assumes that the stages that belong to the same schedule group are
-   * either mutually independent, or connected by a "push" edge.
-   *
-   * @param currentScheduleGroupIndex
+   * @param referenceScheduleGroupIndex
    *      the index of the schedule group that is executing/has executed when this method is called.
-   * @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
-   * enqueued to {@link PendingTaskCollection}.
+   * @return an optional of the (possibly empty) next schedulable stage
    */
-  private Optional<List<Stage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
-    if (currentScheduleGroupIndex > initialScheduleGroup) {
+  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroupIndex) {
+    // Recursively check the previous schedule group.
+    if (referenceScheduleGroupIndex > initialScheduleGroup) {
       final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
-          selectNextStagesToSchedule(currentScheduleGroupIndex - 1);
+          selectNextScheduleGroupToSchedule(referenceScheduleGroupIndex - 1);
       if (ancestorStagesFromAScheduleGroup.isPresent()) {
+        // Nothing to schedule from the previous schedule group.
         return ancestorStagesFromAScheduleGroup;
       }
     }
 
+    // Return the schedulable stage list in reverse-topological order
+    // since the stages that belong to the same schedule group are mutually independent,
+    // or connected by a "push" edge, where scheduling the children stages first is preferred.
+    final List<Stage> reverseTopoStages = physicalPlan.getStageDAG().getTopologicalSort();
+    Collections.reverse(reverseTopoStages);
+
     // All previous schedule groups are complete, we need to check for the current schedule group.
-    final List<Stage> currentScheduleGroup =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
-            stage.getScheduleGroupIndex() == currentScheduleGroupIndex)
-            .collect(Collectors.toList());
-    List<Stage> stagesToSchedule = new LinkedList<>();
-    boolean allStagesComplete = true;
-
-    // We need to reschedule failed_recoverable stages.
-    for (final Stage stageToCheck : currentScheduleGroup) {
-      final StageState.State stageState = jobStateManager.getStageState(stageToCheck.getId());
-      switch (stageState) {
-        case FAILED_RECOVERABLE:
-          stagesToSchedule.add(stageToCheck);
-          allStagesComplete = false;
-          break;
-        case READY:
-        case EXECUTING:
-          allStagesComplete = false;
-          break;
-        default:
-          break;
+    final List<Stage> currentScheduleGroup = reverseTopoStages
+        .stream()
+        .filter(stage -> stage.getScheduleGroupIndex() == referenceScheduleGroupIndex)
+        .collect(Collectors.toList());
+    final boolean allStagesOfThisGroupComplete = currentScheduleGroup
+        .stream()
+        .map(Stage::getId)
+        .map(jobStateManager::getStageState)
+        .allMatch(state -> state.equals(StageState.State.COMPLETE));
+
+    if (!allStagesOfThisGroupComplete) {
+      LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroupIndex);
+      final List<Stage> stagesToSchedule = currentScheduleGroup
+          .stream()
+          .filter(stage -> {
+            final StageState.State stageState = jobStateManager.getStageState(stage.getId());
+            return stageState.equals(StageState.State.FAILED_RECOVERABLE)
+                || stageState.equals(StageState.State.READY);
+          })
+          .collect(Collectors.toList());
+      return (stagesToSchedule.isEmpty())
+          ? Optional.empty()
+          : Optional.of(stagesToSchedule);
+    } else {
+      // By the time the control flow has reached here,
+      // we are ready to move onto the next ScheduleGroup
+      final List<Stage> stagesToSchedule = reverseTopoStages
+          .stream()
+          .filter(stage -> {
+            if (stage.getScheduleGroupIndex() == referenceScheduleGroupIndex + 1) {
+              final String stageId = stage.getId();
+              return jobStateManager.getStageState(stageId) != StageState.State.EXECUTING
+                  && jobStateManager.getStageState(stageId) != StageState.State.COMPLETE;
+            }
+            return false;
+          })
+          .collect(Collectors.toList());
+
+      if (stagesToSchedule.isEmpty()) {
+        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroupIndex + 1);
+        return Optional.empty();
       }
-    }
-    if (!allStagesComplete) {
-      LOG.info("There are remaining stages in the current schedule group, {}", currentScheduleGroupIndex);
-      return (stagesToSchedule.isEmpty()) ? Optional.empty() : Optional.of(stagesToSchedule);
-    }
 
-    // By the time the control flow has reached here,
-    // we are ready to move onto the next ScheduleGroup
-    stagesToSchedule =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage -> {
-          if (stage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
-            final String stageId = stage.getId();
-            return jobStateManager.getStageState(stageId) != StageState.State.EXECUTING
-                && jobStateManager.getStageState(stageId) != StageState.State.COMPLETE;
-          }
-          return false;
-        }).collect(Collectors.toList());
-
-    if (stagesToSchedule.isEmpty()) {
-      LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", currentScheduleGroupIndex + 1);
-      return Optional.empty();
+      return Optional.of(stagesToSchedule);
     }
-
-    // Return the schedulable stage list in reverse-topological order
-    // since the stages that belong to the same schedule group are mutually independent,
-    // or connected by a "push" edge, requiring the children stages to be scheduled first.
-    Collections.reverse(stagesToSchedule);
-    return Optional.of(stagesToSchedule);
   }
 
   /**
-   * Schedules the given stage.
-   * It adds the list of tasks for the stage where the scheduler thread continuously polls from.
    * @param stageToSchedule the stage to schedule.
    */
-  private void scheduleStage(final Stage stageToSchedule) {
+  private List<Task> getSchedulableTasks(final Stage stageToSchedule) {
     final List<StageEdge> stageIncomingEdges =
         physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<StageEdge> stageOutgoingEdges =
         physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
-    final StageState.State stageState = jobStateManager.getStageState(stageToSchedule.getId());
-
     final List<String> taskIdsToSchedule = new LinkedList<>();
     for (final String taskId : stageToSchedule.getTaskIds()) {
       // this happens when the belonging stage's other tasks have failed recoverable,
@@ -357,18 +342,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
         case EXECUTING:
           LOG.info("Skipping {} because its outputs are safe!", taskId);
           break;
-        case READY:
-          if (stageState == StageState.State.FAILED_RECOVERABLE) {
-            LOG.info("Skipping {} because it is already in the queue, but just hasn't been scheduled yet!",
-                taskId);
-          } else {
-            LOG.info("Scheduling {}", taskId);
-            taskIdsToSchedule.add(taskId);
-          }
-          break;
         case FAILED_RECOVERABLE:
-          LOG.info("Re-scheduling {} for failure recovery", taskId);
           jobStateManager.onTaskStateChanged(taskId, READY);
+        case READY:
           taskIdsToSchedule.add(taskId);
           break;
         case ON_HOLD:
@@ -384,13 +360,13 @@ public final class BatchSingleJobScheduler implements Scheduler {
     // each readable and source task will be bounded in executor.
     final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
 
+    final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId);
       final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
       final int attemptIdx = jobStateManager.getTaskAttempt(taskId);
 
-      LOG.debug("Enqueueing {}", taskId);
-      pendingTaskCollection.add(new Task(
+      tasks.add(new Task(
           physicalPlan.getId(),
           taskId,
           attemptIdx,
@@ -400,7 +376,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
           stageOutgoingEdges,
           vertexIdToReadables.get(taskIdx)));
     });
-    schedulerRunner.onATaskAvailable();
+    return tasks;
   }
 
   /**
@@ -457,7 +433,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
       // if the stage this task belongs to is complete,
       if (!jobStateManager.isJobDone()) {
-        scheduleNextStage(stageIdForTaskUponCompletion);
+        scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageIdForTaskUponCompletion));
       }
     }
     schedulerRunner.onAnExecutorAvailable();
@@ -526,7 +502,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
         // TODO #50: Carefully retry tasks in the scheduler
       case OUTPUT_WRITE_FAILURE:
         blockManagerMaster.onProducerTaskFailed(taskId);
-        scheduleNextStage(stageId);
+        scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageId));
         break;
       case CONTAINER_FAILURE:
         LOG.info("Only the failed task will be retried.");
@@ -536,4 +512,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
     }
     schedulerRunner.onAnExecutorAvailable();
   }
+
+  private int getSchedulingIndexOfStage(final String stageId) {
+    return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroupIndex();
+  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
deleted file mode 100644
index a1fa3ee..0000000
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Task;
-
-import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-
-import java.util.Collection;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-
-/**
- * Keep tracks of all pending tasks.
- * {@link Scheduler} enqueues the Tasks to schedule to this queue.
- * {@link SchedulerRunner} refers to this queue when scheduling Tasks.
- */
-@ThreadSafe
-@DriverSide
-@DefaultImplementation(SingleJobTaskCollection.class)
-public interface PendingTaskCollection {
-
-  /**
-   * Adds a Task to this collection.
-   * @param task to add.
-   */
-  void add(final Task task);
-
-  /**
-   * Removes the specified Task to be scheduled.
-   * @param taskId id of the Task
-   * @return the specified Task
-   * @throws NoSuchElementException if the specified Task is not in the queue,
-   *                                or removing this Task breaks scheduling order
-   */
-  Task remove(final String taskId) throws NoSuchElementException;
-
-  /**
-   * Peeks stage that can be scheduled according to job dependency priority.
-   * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
-   * @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
-   */
-  Optional<Collection<Task>> peekSchedulableStage();
-
-  /**
-   * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
-   * @param physicalPlanForJob the job to schedule.
-   */
-  void onJobScheduled(final PhysicalPlan physicalPlanForJob);
-
-  /**
-   * Removes a stage and its descendant stages from this queue.
-   * This is to be used for fault tolerance purposes,
-   * say when a stage fails and all affected Tasks must be removed.
-   * @param stageIdOfTasks for the stage to begin the removal recursively.
-   */
-  void removeTasksAndDescendants(final String stageIdOfTasks);
-
-  /**
-   * Checks whether there are schedulable Tasks in the queue or not.
-   * @return true if there are schedulable Tasks in the queue, false otherwise.
-   */
-  boolean isEmpty();
-
-  /**
-   * Closes and cleans up this queue.
-   */
-  void close();
-}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
new file mode 100644
index 0000000..3fb08c6
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.Task;
+
+import net.jcip.annotations.ThreadSafe;
+
+import javax.inject.Inject;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Points to a collection of pending tasks eligible for scheduling.
+ * This pointer effectively points to a subset of a scheduling group.
+ * Within the collection, the tasks can be scheduled in any order.
+ */
+@ThreadSafe
+public final class PendingTaskCollectionPointer {
+  private Collection<Task> curTaskCollection;
+
+  @Inject
+  public PendingTaskCollectionPointer() {
+  }
+
+  /**
+   * This collection of tasks should take precedence over any previous collection of tasks.
+   * @param newCollection to schedule.
+   */
+  synchronized void setToOverwrite(final Collection<Task> newCollection) {
+    this.curTaskCollection = newCollection;
+  }
+
+  /**
+   * This collection of tasks can be scheduled only if there's no collection of tasks to schedule at the moment.
+   * @param newCollection to schedule
+   */
+  synchronized void setIfNull(final Collection<Task> newCollection) {
+    if (this.curTaskCollection == null) {
+      this.curTaskCollection = newCollection;
+    }
+  }
+
+  /**
+   * Take the whole collection of tasks to schedule, and set the pointer to null.
+   * @return optional tasks to schedule
+   */
+  synchronized Optional<Collection<Task>> getAndSetNull() {
+    final Collection<Task> cur = curTaskCollection;
+    curTaskCollection = null;
+    return Optional.ofNullable(cur);
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index ebf7937..12114a7 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
 
 /**
  * Only two threads call scheduling code: RuntimeMaster thread (RMT), and SchedulerThread(ST).
- * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link PendingTaskCollection},
+ * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link PendingTaskCollectionPointer},
  * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
  */
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 243ef07..62af040 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -25,8 +25,6 @@ import org.apache.reef.annotations.audience.DriverSide;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -38,91 +36,80 @@ import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 
 /**
- * Takes a Task from the pending queue and schedules it to an executor.
+ * Schedules tasks in discrete batches (scheduling iterations).
+ * A scheduling iteration occurs under one of the following conditions
+ * - An executor slot becomes available (for reasons such as task completion/failure, or executor addition)
+ * - A new list of tasks become available (for reasons such as stage completion, task failure, or executor removal)
  */
 @DriverSide
 @NotThreadSafe
 public final class SchedulerRunner {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
-  private final PendingTaskCollection pendingTaskCollection;
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorService schedulerThread;
-  private AtomicBoolean isSchedulerRunning;
+  private boolean isSchedulerRunning;
   private boolean isTerminated;
 
-  // (available executor AND available task to schedule) OR the scheduler has terminated
-  private final DelayedSignalingCondition canScheduleOrTerminated = new DelayedSignalingCondition();
+  private final DelayedSignalingCondition schedulingIteration = new DelayedSignalingCondition();
   private ExecutorRegistry executorRegistry;
   private SchedulingPolicy schedulingPolicy;
 
   @VisibleForTesting
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskCollection pendingTaskCollection,
+                         final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
-    this.pendingTaskCollection = pendingTaskCollection;
+    this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
-    this.isSchedulerRunning = new AtomicBoolean(false);
+    this.isSchedulerRunning = false;
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
     this.schedulingPolicy = schedulingPolicy;
   }
 
   /**
-   * Signals to the condition on executor availability.
-   */
-  public void onAnExecutorAvailable() {
-    canScheduleOrTerminated.signal();
-  }
-
-  /**
-   * Signals to the condition on Task availability.
-   */
-  public void onATaskAvailable() {
-    canScheduleOrTerminated.signal();
-  }
-
-  /**
-   * Run the scheduler thread.
+   * A separate thread is run to schedule tasks to executors.
+   * See comments in the {@link Scheduler} for avoiding race conditions.
    */
-  void runSchedulerThread() {
-    if (!isTerminated) {
-      if (!isSchedulerRunning.getAndSet(true)) {
-        schedulerThread.execute(new SchedulerThread());
-        schedulerThread.shutdown();
+  private final class SchedulerThread implements Runnable {
+    @Override
+    public void run() {
+      while (!isTerminated) {
+        doScheduleTaskList();
+        schedulingIteration.await();
       }
+      jobStateManagers.values().forEach(jobStateManager -> {
+        if (jobStateManager.isJobDone()) {
+          LOG.info("{} is complete.", jobStateManager.getJobId());
+        } else {
+          LOG.info("{} is incomplete.", jobStateManager.getJobId());
+        }
+      });
+      LOG.info("SchedulerRunner Terminated!");
     }
   }
 
-  /**
-   * Begin scheduling a job.
-   * @param jobStateManager the corresponding {@link JobStateManager}
-   */
-  void scheduleJob(final JobStateManager jobStateManager) {
-    if (!isTerminated) {
-      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
-    } // else ignore new incoming jobs when terminated.
-  }
-
-  void terminate() {
-    isTerminated = true;
-    canScheduleOrTerminated.signal();
-  }
-
-  void doScheduleStage() {
-    final Collection<Task> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
-    if (stageToSchedule == null) {
-      // Task queue is empty
-      LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
+  void doScheduleTaskList() {
+    final Optional<Collection<Task>> taskListOptional = pendingTaskCollectionPointer.getAndSetNull();
+    if (!taskListOptional.isPresent()) {
+      // Task list is empty
+      LOG.debug("PendingTaskCollectionPointer is empty. Awaiting for more Tasks...");
       return;
     }
 
-    final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
-    for (final Task task : stageToSchedule) {
+    final Collection<Task> taskList = taskListOptional.get();
+    final List<Task> couldNotSchedule = new ArrayList<>();
+    for (final Task task : taskList) {
       final JobStateManager jobStateManager = jobStateManagers.get(task.getJobId());
-      LOG.debug("Trying to schedule {}...", task.getTaskId());
+      if (!jobStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) {
+        // Guard against race conditions causing duplicate task launches
+        LOG.debug("Skipping {} as it is not READY", task.getTaskId());
+        continue;
+      }
 
+      LOG.debug("Trying to schedule {}...", task.getTaskId());
       executorRegistry.viewExecutors(executors -> {
         final Set<ExecutorRepresenter> candidateExecutors =
             schedulingPolicy.filterExecutorRepresenters(executors, task);
@@ -131,59 +118,68 @@ public final class SchedulerRunner {
         if (firstCandidate.isPresent()) {
           // update metadata first
           jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
-          pendingTaskCollection.remove(task.getTaskId());
-          numScheduledTasks.incrementAndGet();
 
           // send the task
           final ExecutorRepresenter selectedExecutor = firstCandidate.get();
           selectedExecutor.onTaskScheduled(task);
-          LOG.debug("Successfully scheduled {}", task.getTaskId());
         } else {
-          LOG.debug("Failed to schedule {}", task.getTaskId());
+          couldNotSchedule.add(task);
         }
       });
     }
 
-    LOG.debug("Examined {} Tasks, scheduled {} Tasks", stageToSchedule.size(), numScheduledTasks);
-    if (stageToSchedule.size() == numScheduledTasks.get()) {
-      // Scheduled all tasks in the stage
-      // Immediately run next iteration to check whether there is another stage that can be scheduled
-      LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if any)...");
-      canScheduleOrTerminated.signal();
+    LOG.debug("All except {} were scheduled among {}", new Object[]{couldNotSchedule, taskList});
+    if (couldNotSchedule.size() > 0) {
+      // Try these again, if no new task list has been set
+      pendingTaskCollectionPointer.setIfNull(couldNotSchedule);
     }
   }
 
   /**
-   * A separate thread is run to schedule tasks to executors.
-   * See comments in the {@link Scheduler} for avoiding race conditions.
+   * Signals to the condition on executor availability.
    */
-  private final class SchedulerThread implements Runnable {
-    @Override
-    public void run() {
-      // Run the first iteration unconditionally
-      canScheduleOrTerminated.signal();
+  void onAnExecutorAvailable() {
+    schedulingIteration.signal();
+  }
 
-      while (!isTerminated) {
-        // Iteration guard
-        canScheduleOrTerminated.await();
-        doScheduleStage();
-      }
-      jobStateManagers.values().forEach(jobStateManager -> {
-        if (jobStateManager.isJobDone()) {
-          LOG.info("{} is complete.", jobStateManager.getJobId());
-        } else {
-          LOG.info("{} is incomplete.", jobStateManager.getJobId());
-        }
-      });
-      LOG.info("SchedulerRunner Terminated!");
+  /**
+   * Signals to the condition on the Task collection availability.
+   */
+  void onNewPendingTaskCollectionAvailable() {
+    schedulingIteration.signal();
+  }
+
+  /**
+   * Run the scheduler thread.
+   */
+  void runSchedulerThread() {
+    if (!isTerminated && !isSchedulerRunning) {
+      schedulerThread.execute(new SchedulerThread());
+      schedulerThread.shutdown();
+      isSchedulerRunning = true;
     }
   }
 
   /**
+   * Begin scheduling a job.
+   * @param jobStateManager the corresponding {@link JobStateManager}
+   */
+  void scheduleJob(final JobStateManager jobStateManager) {
+    if (!isTerminated) {
+      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
+    } // else ignore new incoming jobs when terminated.
+  }
+
+  void terminate() {
+    isTerminated = true;
+    schedulingIteration.signal();
+  }
+
+  /**
    * A {@link Condition} that allows 'delayed' signaling.
    */
   private final class DelayedSignalingCondition {
-    private final AtomicBoolean hasDelayedSignal = new AtomicBoolean(false);
+    private boolean hasDelayedSignal = false;
     private final Lock lock = new ReentrantLock();
     private final Condition condition = lock.newCondition();
 
@@ -191,10 +187,10 @@ public final class SchedulerRunner {
      * Signals to this condition. If no thread is awaiting for this condition,
      * signaling is delayed until the first next {@link #await} invocation.
      */
-    public void signal() {
+    void signal() {
       lock.lock();
       try {
-        hasDelayedSignal.set(true);
+        hasDelayedSignal = true;
         condition.signal();
       } finally {
         lock.unlock();
@@ -205,13 +201,13 @@ public final class SchedulerRunner {
      * Awaits to this condition. The thread will awake when there is a delayed signal,
      * or the next first {@link #signal} invocation.
      */
-    public void await() {
+    void await() {
       lock.lock();
       try {
-        if (!hasDelayedSignal.get()) {
+        if (!hasDelayedSignal) {
           condition.await();
         }
-        hasDelayedSignal.set(false);
+        hasDelayedSignal = false;
       } catch (final InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
deleted file mode 100644
index ddcfbf5..0000000
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.plan.Task;
-import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.annotations.audience.DriverSide;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * {@link PendingTaskCollection} implementation.
- * This class provides two-level scheduling by keeping track of schedulable stages and stage-Task membership.
- * {@link #peekSchedulableStage()} returns collection of Tasks which belong to one of the schedulable stages.
- */
-@ThreadSafe
-@DriverSide
-public final class SingleJobTaskCollection implements PendingTaskCollection {
-  private PhysicalPlan physicalPlan;
-
-  /**
-   * Pending Tasks awaiting to be scheduled for each stage.
-   */
-  private final ConcurrentMap<String, Map<String, Task>> stageIdToPendingTasks;
-
-  /**
-   * Stages with Tasks that have not yet been scheduled.
-   */
-  private final BlockingDeque<String> schedulableStages;
-
-  @Inject
-  public SingleJobTaskCollection() {
-    stageIdToPendingTasks = new ConcurrentHashMap<>();
-    schedulableStages = new LinkedBlockingDeque<>();
-  }
-
-  @Override
-  public synchronized void add(final Task task) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
-
-    stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
-      if (taskIdToTask == null) {
-        final Map<String, Task> taskIdToTaskMap = new HashMap<>();
-        taskIdToTaskMap.put(task.getTaskId(), task);
-        updateSchedulableStages(stageId);
-        return taskIdToTaskMap;
-      } else {
-        taskIdToTask.put(task.getTaskId(), task);
-        return taskIdToTask;
-      }
-    });
-  }
-
-  /**
-   * Removes the specified Task to be scheduled.
-   * The specified Task should belong to the collection from {@link #peekSchedulableStage()}.
-   * @param taskId id of the Task
-   * @return the specified Task
-   * @throws NoSuchElementException if the specified Task is not in the queue,
-   *                                or removing this Task breaks scheduling order
-   *                                (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
-   */
-  @Override
-  public synchronized Task remove(final String taskId) throws NoSuchElementException {
-    final String stageId = schedulableStages.peekFirst();
-    if (stageId == null) {
-      throw new NoSuchElementException("No schedulable stage in Task queue");
-    }
-
-    final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
-
-    if (pendingTasksForStage == null) {
-      throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
-    }
-    final Task taskToSchedule = pendingTasksForStage.remove(taskId);
-    if (taskToSchedule == null) {
-      throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
-    }
-    if (pendingTasksForStage.isEmpty()) {
-      if (!schedulableStages.pollFirst().equals(stageId)) {
-        throw new RuntimeException(String.format("Expected stage %s to be polled", stageId));
-      }
-      stageIdToPendingTasks.remove(stageId);
-      stageIdToPendingTasks.forEach((scheduledStageId, tasks) ->
-          updateSchedulableStages(scheduledStageId));
-    }
-
-    return taskToSchedule;
-  }
-
-  /**
-   * Peeks Tasks that can be scheduled.
-   * @return Tasks to be scheduled, or {@link Optional#empty()} if the queue is empty
-   * @return collection of Tasks which belong to one of the schedulable stages
-   *         or {@link Optional#empty} if the queue is empty
-   */
-  @Override
-  public synchronized Optional<Collection<Task>> peekSchedulableStage() {
-    final String stageId = schedulableStages.peekFirst();
-    if (stageId == null) {
-      return Optional.empty();
-    }
-
-    final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
-    if (pendingTasksForStage == null) {
-      throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
-    }
-    return Optional.of(new ArrayList<>(pendingTasksForStage.values()));
-  }
-
-  /**
-   * Removes a stage and its descendant stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  @Override
-  public synchronized void removeTasksAndDescendants(final String stageId) {
-    removeStageAndChildren(stageId);
-  }
-
-  /**
-   * Recursively removes a stage and its children stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  private synchronized void removeStageAndChildren(final String stageId) {
-    if (schedulableStages.remove(stageId)) {
-      stageIdToPendingTasks.remove(stageId);
-    }
-
-    physicalPlan.getStageDAG().getChildren(stageId).forEach(
-        stage -> removeStageAndChildren(stage.getId()));
-  }
-
-  /**
-   * Updates the two-level data structure by examining a new candidate stage.
-   * If there are no stages with higher priority, the candidate can be made schedulable.
-   *
-   * NOTE: This method provides the "line up" between stages, by assigning priorities,
-   * serving as the key to the "priority" implementation of this class.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   */
-  private synchronized void updateSchedulableStages(final String candidateStageId) {
-    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
-
-    if (isSchedulable(candidateStageId)) {
-      // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
-      jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        if (schedulableStages.contains(ancestorStage.getId())) {
-          if (!schedulableStages.remove(ancestorStage.getId())) {
-            throw new RuntimeException(String.format("No such stage: %s", ancestorStage.getId()));
-          }
-        }
-      });
-      if (!schedulableStages.contains(candidateStageId)) {
-        schedulableStages.addLast(candidateStageId);
-      }
-    }
-  }
-
-  /**
-   * Determines whether the given candidate stage is schedulable immediately or not.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   * @return true if schedulable, false otherwise.
-   */
-  private synchronized boolean isSchedulable(final String candidateStageId) {
-    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
-    for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
-      if (schedulableStages.contains(descendantStage.getId())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public synchronized void onJobScheduled(final PhysicalPlan physicalPlanForJob) {
-    this.physicalPlan = physicalPlanForJob;
-  }
-
-  @Override
-  public synchronized boolean isEmpty() {
-    return schedulableStages.isEmpty();
-  }
-
-  @Override
-  public synchronized void close() {
-    schedulableStages.clear();
-    stageIdToPendingTasks.clear();
-  }
-}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index f0f5d19..ba8a2f5 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -67,7 +67,7 @@ public final class BatchSingleJobSchedulerTest {
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskCollection pendingTaskCollection;
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
@@ -85,13 +85,13 @@ public final class BatchSingleJobSchedulerTest {
 
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
-    pendingTaskCollection = new SingleJobTaskCollection();
+    pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
     schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollection, executorRegistry);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
-        new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollection,
+        new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollectionPointer,
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index d2ec962..0e57dc5 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -66,7 +66,7 @@ public final class FaultToleranceTest {
   private ExecutorRegistry executorRegistry;
 
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskCollection pendingTaskCollection;
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
@@ -87,15 +87,15 @@ public final class FaultToleranceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
-    pendingTaskCollection = new SingleJobTaskCollection();
+    pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
     schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollection, executorRegistry);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
     }
-    return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollection, blockManagerMaster,
+    return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollectionPointer, blockManagerMaster,
         pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
   }
 
@@ -137,23 +137,22 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 2) {
         scheduler.onExecutorRemoved("a3");
         // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running Task.
         scheduler.onExecutorRemoved("a2");
 
         // Re-schedule
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
@@ -161,16 +160,15 @@ public final class FaultToleranceTest {
         assertTrue(maxTaskAttempt.isPresent());
         assertEquals(2, (int) maxTaskAttempt.get());
 
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 3) {
         // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3.
         // Schedule only the first Task
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, true);
       } else {
         throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d",
@@ -215,24 +213,22 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 2) {
         // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.FAILED_RECOVERABLE, 1,
                 TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 
         // Re-schedule
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
@@ -240,7 +236,6 @@ public final class FaultToleranceTest {
         assertTrue(maxTaskAttempt.isPresent());
         assertEquals(2, (int) maxTaskAttempt.get());
 
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId)));
       }
@@ -283,15 +278,14 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 2) {
         // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         stage.getTaskIds().forEach(taskId ->
@@ -300,7 +294,7 @@ public final class FaultToleranceTest {
                 TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
 
         // Re-schedule
-        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointerTest.java
new file mode 100644
index 0000000..46cf822
--- /dev/null
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.Task;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests {@link PendingTaskCollectionPointer}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class})
+public final class PendingTaskCollectionPointerTest {
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
+
+  private List<Task> mockTaskList() {
+    final Task task = mock(Task.class);
+    return Arrays.asList(task);
+  }
+
+  @Before
+  public void setUp() {
+    this.pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
+  }
+
+  @Test
+  public void nullByDefault() {
+    assertFalse(pendingTaskCollectionPointer.getAndSetNull().isPresent());
+  }
+
+  @Test
+  public void setIfNull() {
+    final List<Task> taskList = mockTaskList();
+    pendingTaskCollectionPointer.setIfNull(taskList);
+    final Optional<Collection<Task>> optional = pendingTaskCollectionPointer.getAndSetNull();
+    assertTrue(optional.isPresent());
+    assertEquals(taskList, optional.get());
+  }
+
+  @Test
+  public void setToOverwrite() {
+    final List<Task> taskList1 = mockTaskList();
+    pendingTaskCollectionPointer.setIfNull(taskList1);
+    final List<Task> taskList2 = mockTaskList();
+    pendingTaskCollectionPointer.setToOverwrite(taskList2);
+    final Optional<Collection<Task>> optional = pendingTaskCollectionPointer.getAndSetNull();
+    assertTrue(optional.isPresent());
+    assertEquals(taskList2, optional.get());
+  }
+}
+
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index cb15f7a..41f9642 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -16,11 +16,14 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -100,20 +103,14 @@ final class SchedulerTestUtil {
     sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, newState, attemptIdx, null);
   }
 
-  static void mockSchedulingBySchedulerRunner(final PendingTaskCollection pendingTaskCollection,
+  static void mockSchedulingBySchedulerRunner(final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                                               final SchedulingPolicy schedulingPolicy,
                                               final JobStateManager jobStateManager,
                                               final ExecutorRegistry executorRegistry,
                                               final boolean scheduleOnlyTheFirstStage) {
     final SchedulerRunner schedulerRunner =
-        new SchedulerRunner(schedulingPolicy, pendingTaskCollection, executorRegistry);
+        new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
     schedulerRunner.scheduleJob(jobStateManager);
-    while (!pendingTaskCollection.isEmpty()) {
-      schedulerRunner.doScheduleStage();
-      if (scheduleOnlyTheFirstStage) {
-        // Schedule only the first stage
-        break;
-      }
-    }
+    schedulerRunner.doScheduleTaskList();
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
deleted file mode 100644
index bb48746..0000000
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.Task;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests {@link SingleJobTaskCollection}.
- */
-public final class SingleTaskQueueTest {
-  private SingleJobTaskCollection pendingTaskPriorityQueue;
-
-  /**
-   * To be used for a thread pool to execute tasks.
-   */
-  private ExecutorService executorService;
-
-  @Before
-  public void setUp() throws Exception{
-    pendingTaskPriorityQueue = new SingleJobTaskCollection();
-    executorService = Executors.newFixedThreadPool(2);
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskCollection}.
-   * Tests whether the dequeued Tasks are according to the stage-dependency priority.
-   */
-  @Test
-  public void testPushPriority() throws Exception {
-    final PhysicalPlan physicalPlan =
-        TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, true);
-
-    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    final AtomicBoolean passed = new AtomicBoolean(true);
-
-    // This mimics Batch Scheduler's behavior
-    executorService.submit(() -> {
-      // First schedule the children Tasks (since it is push).
-      // BatchSingleJobScheduler will schedule Tasks in this order as well.
-      scheduleStage(dagOf2Stages.get(1));
-      // Then, schedule the parent Tasks.
-      scheduleStage(dagOf2Stages.get(0));
-
-      countDownLatch.countDown();
-    }).get();
-
-    // This mimics SchedulerRunner's behavior
-    executorService.submit(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final Task dequeuedTask = dequeue();
-        assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
-            dagOf2Stages.get(1).getId());
-
-        // Let's say we fail to schedule, and add this Task back.
-        pendingTaskPriorityQueue.add(dequeuedTask);
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        // Now that we've dequeued all of the children Tasks, we should now start getting the parents.
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-      } catch (Exception e) {
-        e.printStackTrace();
-        passed.getAndSet(false);
-      } finally {
-        countDownLatch.countDown();
-      }
-    }).get();
-
-    countDownLatch.await();
-    assertTrue(passed.get());
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskCollection}.
-   * Tests whether the dequeued Tasks are according to the stage-dependency priority.
-   */
-  @Test
-  public void testPullPriority() throws Exception {
-    final PhysicalPlan physicalPlan =
-        TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, false);
-    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
-    assertEquals(dagOf2Stages.get(1).getScheduleGroupIndex(), 1);
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    // This mimics Batch Scheduler's behavior
-    executorService.submit(() -> {
-      // First schedule the parent Tasks (since it is pull).
-      // BatchSingleJobScheduler will schedule Tasks in this order as well.
-      scheduleStage(dagOf2Stages.get(0));
-      countDownLatch.countDown();
-    }).get();
-
-    // This mimics SchedulerRunner's behavior
-    executorService.submit(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final Task dequeuedTask = dequeue();
-        assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
-            dagOf2Stages.get(0).getId());
-
-        // Let's say we fail to schedule, and add this Task back.
-        pendingTaskPriorityQueue.add(dequeuedTask);
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        // Now that we've dequeued all of the children Tasks, we should now schedule children.
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        // Schedule the children Tasks.
-        scheduleStage(dagOf2Stages.get(1));
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        countDownLatch.countDown();
-      }
-    }).get();
-
-    countDownLatch.await();
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskCollection}.
-   * Tests whether the dequeued Tasks are according to the stage-dependency priority,
-   * while concurrently scheduling Tasks that have dependencies, but are of different container types.
-   */
-  @Test
-  public void testWithDifferentContainerType() throws Exception {
-    final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
-        TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, true);
-    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    // First schedule the children Tasks (since it is push).
-    // BatchSingleJobScheduler will schedule Tasks in this order as well.
-    scheduleStage(dagOf2Stages.get(1));
-    // Then, schedule the parent Tasks.
-    scheduleStage(dagOf2Stages.get(0));
-
-    countDownLatch.countDown();
-
-    // This mimics SchedulerRunner's behavior.
-    executorService.submit(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        countDownLatch.countDown();
-      }
-    }).get();
-
-    countDownLatch.await();
-  }
-
-  /**
-   * Schedule the tasks in a stage.
-   * @param stage the stage to schedule.
-   */
-  private void scheduleStage(final Stage stage) {
-    stage.getTaskIds().forEach(taskId ->
-        pendingTaskPriorityQueue.add(new Task(
-            "TestPlan",
-            taskId,
-            0,
-            stage.getExecutionProperties(),
-            stage.getSerializedIRDAG(),
-            Collections.emptyList(),
-            Collections.emptyList(),
-            Collections.emptyMap())));
-  }
-
-  /**
-   * Dequeues a scheduled task from the task priority queue and get it's stage name.
-   * @return the stage name of the dequeued task.
-   */
-  private String dequeueAndGetStageId() {
-    final Task task = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
-  }
-
-  /**
-   * Dequeues a scheduled task from the task priority queue.
-   * @return the Task dequeued
-   */
-  private Task dequeue() {
-    final Collection<Task> tasks
-        = pendingTaskPriorityQueue.peekSchedulableStage().get();
-    return pendingTaskPriorityQueue.remove(tasks.iterator().next().getTaskId());
-  }
-}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index ab1beb7..5d24f81 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -63,8 +63,9 @@ public class SailfishPassTest {
                 edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
             assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
                 edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-            assertEquals(BytesDecoderFactory.of(),
-                edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+            // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
+            //assertEquals(BytesDecoderFactory.of(),
+            //    edgeToMerger.getPropertyValue(DecoderProperty.class).get());
           } else {
             assertEquals(DataFlowModelProperty.Value.Pull,
                 edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
@@ -77,8 +78,9 @@ public class SailfishPassTest {
               edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
           assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
               edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-          assertEquals(BytesEncoderFactory.of(),
-              edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+          // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
+          //assertEquals(BytesEncoderFactory.of(),
+          //    edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
         });
       } else {
         // Non merger vertex.