You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/06/29 06:05:08 UTC

[incubator-nemo] branch master updated: [NEMO-50] Carefully retry tasks in the scheduler (#59)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new b621434  [NEMO-50] Carefully retry tasks in the scheduler (#59)
b621434 is described below

commit b621434ce1fa41e11f2b202e6076ac62f10b6d46
Author: John Yang <jo...@gmail.com>
AuthorDate: Fri Jun 29 15:05:06 2018 +0900

    [NEMO-50] Carefully retry tasks in the scheduler (#59)
    
    JIRA: [NEMO-50: Carefully retry tasks in the scheduler](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-##)
    
    **Major changes:**
    - doSchedule(): The main scheduling entry point. Can also be used for retrying tasks
    - recursivelyGetParentTasksForLostBlocks(): To retry tasks that need to be retried (not more, not less)
    - Clearly differentiates restarts due to task failures from restarts due to executor removal
    - Reorganizes states for proper FT
      - TaskState: READY, EXECUTING, ON_HOLD, COMPLETE, SHOULD_RETRY, FAILED
      - StageState: INCOMPLETE, COMPLETE
    
    **Minor changes to note:**
    - Nice logs for tracking job progress
    - Refactoring and added comments to make it easier to understand when and why schedulerRunner.onExecutorSlotAvailable() and doSchedule() are called
    
    **Tests for the changes:**
    - TaskRestartTest: Interleaves failure events with task completion events
    
    **Other comments:**
    - INPUT_READ_FAILURE will be handled in https://issues.apache.org/jira/browse/NEMO-54
    
    resolves [NEMO-50](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-50)
---
 README.md                                          |   4 +
 .../snu/nemo/runtime/common/plan/StageEdge.java    |   8 +-
 .../snu/nemo/runtime/common/state/StageState.java  |  38 +--
 .../snu/nemo/runtime/common/state/TaskState.java   |  38 +--
 .../nemo/runtime/executor/TaskStateManager.java    |  14 +-
 .../runtime/executor/data/BlockManagerWorker.java  |   5 +-
 .../nemo/runtime/executor/task/TaskExecutor.java   |  14 +-
 .../runtime/executor/task/TaskExecutorTest.java    |   8 +-
 runtime/master/pom.xml                             |  10 +
 .../nemo/runtime/master/BlockManagerMaster.java    |  54 ++-
 .../snu/nemo/runtime/master/JobStateManager.java   | 103 ++----
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  14 +-
 .../master/scheduler/BatchSingleJobScheduler.java  | 361 ++++++++++-----------
 .../nemo/runtime/master/scheduler/Scheduler.java   |   2 +-
 .../runtime/master/scheduler/SchedulerRunner.java  |  17 +-
 .../nemo/runtime/master/JobStateManagerTest.java   |   4 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |  13 +-
 .../master/scheduler/SchedulerTestUtil.java        |  14 +-
 .../runtime/master/scheduler/TaskRestartTest.java  | 225 +++++++++++++
 .../snu/nemo/tests/client/ClientEndpointTest.java  |   3 +-
 20 files changed, 563 insertions(+), 386 deletions(-)

diff --git a/README.md b/README.md
index d7b1db8..9295593 100644
--- a/README.md
+++ b/README.md
@@ -139,3 +139,7 @@ Nemo Compiler and Engine can store JSON representation of intermediate DAGs.
   	-dag_dir "./dag/als" \
   	-user_args "`pwd`/examples/resources/sample_input_als 10 3"
 ```
+
+## Speeding up builds 
+* To exclude Spark related packages: mvn clean install -T 2C -DskipTests -pl \\!compiler/frontend/spark,\\!examples/spark
+* To exclude Beam related packages: mvn clean install -T 2C -DskipTests -pl \\!compiler/frontend/beam,\\!examples/beam
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index a45b9b6..504501b 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -95,16 +95,16 @@ public final class StageEdge extends RuntimeEdge<Stage> {
   }
 
   /**
-   * @return the source vertex of the edge.
+   * @return the source IR vertex of the edge.
    */
-  public IRVertex getSrcVertex() {
+  public IRVertex getSrcIRVertex() {
     return srcVertex;
   }
 
   /**
-   * @return the destination vertex of the edge.
+   * @return the destination IR vertex of the edge.
    */
-  public IRVertex getDstVertex() {
+  public IRVertex getDstIRVertex() {
     return dstVertex;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index 6bf3380..e7edbad 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -19,6 +19,10 @@ import edu.snu.nemo.common.StateMachine;
 
 /**
  * Represents the states and their transitions of a stage.
+ *
+ * Maintained as simple two (INCOMPLETE, COMPLETE) states to avoid ambiguity when the tasks are in different states.
+ * For example it is not clear whether a stage should be EXECUTING or SHOULD_RESTART, if one of the tasks in the stage
+ * is EXECUTING, and another is SHOULD_RESTART.
  */
 public final class StageState {
   private final StateMachine stateMachine;
@@ -31,31 +35,17 @@ public final class StageState {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.READY, "The stage has been created.");
-    stateMachineBuilder.addState(State.EXECUTING, "The stage is executing.");
+    stateMachineBuilder.addState(State.INCOMPLETE, "Some tasks in this stage are not complete.");
     stateMachineBuilder.addState(State.COMPLETE, "All of this stage's tasks have completed.");
-    stateMachineBuilder.addState(State.FAILED_RECOVERABLE, "Stage failed, but is recoverable.");
 
     // Add transitions
-    stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
-        "The stage can now schedule its tasks");
-    stateMachineBuilder.addTransition(State.READY, State.FAILED_RECOVERABLE,
-        "Recoverable failure");
-
-    stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
-        "All tasks complete");
-    stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_RECOVERABLE,
-        "Recoverable failure in a task");
-
-    stateMachineBuilder.addTransition(State.COMPLETE, State.FAILED_RECOVERABLE,
-        "Container on which the stage's output is stored failed");
-
-    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.READY,
-        "Recoverable stage failure");
-    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.EXECUTING,
-        "Recoverable stage failure");
+    stateMachineBuilder.addTransition(
+        State.INCOMPLETE, State.INCOMPLETE, "A task in the stage needs to be retried");
+    stateMachineBuilder.addTransition(State.INCOMPLETE, State.COMPLETE, "All tasks complete");
+    stateMachineBuilder.addTransition(State.COMPLETE, State.INCOMPLETE,
+        "Completed before, but a task in this stage should be retried");
 
-    stateMachineBuilder.setInitialState(State.READY);
+    stateMachineBuilder.setInitialState(State.INCOMPLETE);
 
     return stateMachineBuilder.build();
   }
@@ -68,10 +58,8 @@ public final class StageState {
    * StageState.
    */
   public enum State {
-    READY,
-    EXECUTING,
-    COMPLETE,
-    FAILED_RECOVERABLE,
+    INCOMPLETE,
+    COMPLETE
   }
 
   @Override
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index b47696a..74b808c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -31,37 +31,32 @@ public final class TaskState {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.READY, "The task has been created.");
+    stateMachineBuilder.addState(State.READY, "The task is ready to be executed.");
     stateMachineBuilder.addState(State.EXECUTING, "The task is executing.");
+    stateMachineBuilder.addState(State.ON_HOLD, "The task is paused (e.g., for dynamic optimization).");
     stateMachineBuilder.addState(State.COMPLETE, "The task has completed.");
-    stateMachineBuilder.addState(State.FAILED_RECOVERABLE, "Task failed, but is recoverable.");
-    stateMachineBuilder.addState(State.FAILED_UNRECOVERABLE, "Task failed, and is unrecoverable. The job will fail.");
-    stateMachineBuilder.addState(State.ON_HOLD, "The task is paused for dynamic optimization.");
+    stateMachineBuilder.addState(State.SHOULD_RETRY, "The task should be retried.");
+    stateMachineBuilder.addState(State.FAILED, "Task failed, and is unrecoverable. The job will fail.");
 
-    // From NOT_AVAILABLE
+    // From READY
     stateMachineBuilder.addTransition(State.READY, State.EXECUTING, "Scheduling to executor");
-    stateMachineBuilder.addTransition(State.READY, State.FAILED_RECOVERABLE,
-        "Stage Failure by a recoverable failure in another task");
 
     // From EXECUTING
     stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE, "Task completed normally");
-    stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_UNRECOVERABLE, "Unrecoverable failure");
-    stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_RECOVERABLE, "Recoverable failure");
     stateMachineBuilder.addTransition(State.EXECUTING, State.ON_HOLD, "Task paused for dynamic optimization");
+    stateMachineBuilder.addTransition(State.EXECUTING, State.SHOULD_RETRY, "Did not complete, should be retried");
+    stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED, "Unrecoverable failure");
 
     // From ON HOLD
     stateMachineBuilder.addTransition(State.ON_HOLD, State.COMPLETE, "Task completed after being on hold");
-    stateMachineBuilder.addTransition(State.ON_HOLD, State.FAILED_UNRECOVERABLE, "Unrecoverable failure");
-    stateMachineBuilder.addTransition(State.ON_HOLD, State.FAILED_RECOVERABLE, "Recoverable failure");
+    stateMachineBuilder.addTransition(State.ON_HOLD, State.SHOULD_RETRY, "Did not complete, should be retried");
+    stateMachineBuilder.addTransition(State.ON_HOLD, State.FAILED, "Unrecoverable failure");
 
     // From COMPLETE
-    stateMachineBuilder.addTransition(State.COMPLETE, State.EXECUTING, "Completed before, but re-execute");
-    stateMachineBuilder.addTransition(State.COMPLETE, State.FAILED_RECOVERABLE,
-        "Recoverable failure in a task/Container failure");
+    stateMachineBuilder.addTransition(State.COMPLETE, State.SHOULD_RETRY, "Completed before, but should be retried");
 
-
-    // From FAILED_RECOVERABLE
-    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.READY, "Recovered from failure and is ready");
+    // From SHOULD_RETRY
+    stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.READY, "Ready to be retried");
 
     stateMachineBuilder.setInitialState(State.READY);
     return stateMachineBuilder.build();
@@ -77,19 +72,18 @@ public final class TaskState {
   public enum State {
     READY,
     EXECUTING,
-    COMPLETE,
-    FAILED_RECOVERABLE,
-    FAILED_UNRECOVERABLE,
     ON_HOLD, // for dynamic optimization
+    COMPLETE,
+    SHOULD_RETRY,
+    FAILED,
   }
 
   /**
    * Causes of a recoverable failure.
    */
-  public enum RecoverableFailureCause {
+  public enum RecoverableTaskFailureCause {
     INPUT_READ_FAILURE, // Occurs when a task is unable to read its input block
     OUTPUT_WRITE_FAILURE, // Occurs when a task successfully generates its output, but is unable to write it
-    CONTAINER_FAILURE // When a REEF evaluator fails
   }
 
   @Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index bf01350..9645621 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -63,7 +63,7 @@ public final class TaskStateManager {
    */
   public synchronized void onTaskStateChanged(final TaskState.State newState,
                                               final Optional<String> vertexPutOnHold,
-                                              final Optional<TaskState.RecoverableFailureCause> cause) {
+                                              final Optional<TaskState.RecoverableTaskFailureCause> cause) {
     final Map<String, Object> metric = new HashMap<>();
 
     switch (newState) {
@@ -80,13 +80,13 @@ public final class TaskStateManager {
         metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
-      case FAILED_RECOVERABLE:
+      case SHOULD_RETRY:
         LOG.debug("Task ID {} failed (recoverable).", this.taskId);
         metric.put("ToState", newState);
         metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
-      case FAILED_UNRECOVERABLE:
+      case FAILED:
         LOG.debug("Task ID {} failed (unrecoverable).", this.taskId);
         metric.put("ToState", newState);
         metricCollector.endMeasurement(taskId, metric);
@@ -109,7 +109,7 @@ public final class TaskStateManager {
    */
   private void notifyTaskStateToMaster(final TaskState.State newState,
                                        final Optional<String> vertexPutOnHold,
-                                       final Optional<TaskState.RecoverableFailureCause> cause) {
+                                       final Optional<TaskState.RecoverableTaskFailureCause> cause) {
     final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
         ControlMessage.TaskStateChangedMsg.newBuilder()
             .setExecutorId(executorId)
@@ -141,9 +141,9 @@ public final class TaskStateManager {
         return ControlMessage.TaskStateFromExecutor.EXECUTING;
       case COMPLETE:
         return ControlMessage.TaskStateFromExecutor.COMPLETE;
-      case FAILED_RECOVERABLE:
+      case SHOULD_RETRY:
         return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
-      case FAILED_UNRECOVERABLE:
+      case FAILED:
         return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
       case ON_HOLD:
         return ControlMessage.TaskStateFromExecutor.ON_HOLD;
@@ -153,7 +153,7 @@ public final class TaskStateManager {
   }
 
   private ControlMessage.RecoverableFailureCause convertFailureCause(
-      final TaskState.RecoverableFailureCause cause) {
+      final TaskState.RecoverableTaskFailureCause cause) {
     switch (cause) {
       case INPUT_READ_FAILURE:
         return ControlMessage.RecoverableFailureCause.InputReadFailure;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 52c55fd..144ded0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -135,11 +135,10 @@ public final class BlockManagerWorker {
    * @param keyRange   the key range descriptor.
    * @return the result data in the block.
    */
-  private CompletableFuture<DataUtil.IteratorWithNumBytes> retrieveDataFromBlock(
+  private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(
       final String blockId,
       final InterTaskDataStoreProperty.Value blockStore,
       final KeyRange keyRange) {
-    LOG.info("RetrieveDataFromBlock: {}", blockId);
     final BlockStore store = getBlockStore(blockStore);
 
     // First, try to fetch the block from local BlockStore.
@@ -229,7 +228,7 @@ public final class BlockManagerWorker {
       final String targetExecutorId = blockLocationInfoMsg.getOwnerExecutorId();
       if (targetExecutorId.equals(executorId) || targetExecutorId.equals(REMOTE_FILE_STORE)) {
         // Block resides in the evaluator
-        return retrieveDataFromBlock(blockId, blockStore, keyRange);
+        return getDataFromLocalBlock(blockId, blockStore, keyRange);
       } else {
         final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.newBuilder()
             .setBlockId(blockId)
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index fe20ade..c320785 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -217,7 +217,7 @@ public final class TaskExecutor {
       doExecute();
     } catch (Throwable throwable) {
       // ANY uncaught throwable is reported to the master
-      taskStateManager.onTaskStateChanged(TaskState.State.FAILED_UNRECOVERABLE, Optional.empty(), Optional.empty());
+      taskStateManager.onTaskStateChanged(TaskState.State.FAILED, Optional.empty(), Optional.empty());
       LOG.error(ExceptionUtils.getStackTrace(throwable));
     }
   }
@@ -319,8 +319,8 @@ public final class TaskExecutor {
         try {
           element = dataFetcher.fetchDataElement();
         } catch (IOException e) {
-          taskStateManager.onTaskStateChanged(TaskState.State.FAILED_RECOVERABLE,
-              Optional.empty(), Optional.of(TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
+          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+              Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
           LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e.toString());
           return false;
         }
@@ -366,9 +366,9 @@ public final class TaskExecutor {
                                                  final DataTransferFactory dataTransferFactory) {
     return inEdgesFromParentTasks
         .stream()
-        .filter(inEdge -> inEdge.getDstVertex().getId().equals(irVertex.getId()))
+        .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId()))
         .map(inEdgeForThisVertex -> dataTransferFactory
-            .createReader(taskIndex, inEdgeForThisVertex.getSrcVertex(), inEdgeForThisVertex))
+            .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex))
         .collect(Collectors.toList());
   }
 
@@ -378,9 +378,9 @@ public final class TaskExecutor {
                                                     final DataTransferFactory dataTransferFactory) {
     return outEdgesToChildrenTasks
         .stream()
-        .filter(outEdge -> outEdge.getSrcVertex().getId().equals(irVertex.getId()))
+        .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
         .map(outEdgeForThisVertex -> dataTransferFactory
-            .createWriter(irVertex, taskIndex, outEdgeForThisVertex.getDstVertex(), outEdgeForThisVertex))
+            .createWriter(irVertex, taskIndex, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
         .collect(Collectors.toList());
   }
 
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 0c58744..96b755d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -273,15 +273,15 @@ public final class TaskExecutorTest {
 
   private StageEdge mockStageEdgeFrom(final IRVertex irVertex) {
     final StageEdge edge = mock(StageEdge.class);
-    when(edge.getSrcVertex()).thenReturn(irVertex);
-    when(edge.getDstVertex()).thenReturn(new OperatorVertex(new RelayTransform()));
+    when(edge.getSrcIRVertex()).thenReturn(irVertex);
+    when(edge.getDstIRVertex()).thenReturn(new OperatorVertex(new RelayTransform()));
     return edge;
   }
 
   private StageEdge mockStageEdgeTo(final IRVertex irVertex) {
     final StageEdge edge = mock(StageEdge.class);
-    when(edge.getSrcVertex()).thenReturn(new OperatorVertex(new RelayTransform()));
-    when(edge.getDstVertex()).thenReturn(irVertex);
+    when(edge.getSrcIRVertex()).thenReturn(new OperatorVertex(new RelayTransform()));
+    when(edge.getDstIRVertex()).thenReturn(irVertex);
     return edge;
   }
 
diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml
index 75c7c25..6cb5abc 100644
--- a/runtime/master/pom.xml
+++ b/runtime/master/pom.xml
@@ -61,5 +61,15 @@ limitations under the License.
             <artifactId>jackson-databind</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <!--
+            This is needed to view the logs when running unit tests.
+            See https://dzone.com/articles/how-configure-slf4j-different for details.
+            -->
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 1da2e72..da7c1ef 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -15,14 +15,20 @@
  */
 package edu.snu.nemo.runtime.master;
 
+import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.IllegalMessageException;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.state.BlockState;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +43,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.IntStream;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -74,6 +81,36 @@ public final class BlockManagerMaster {
     this.lock = new ReentrantReadWriteLock();
   }
 
+  public void initialize(final PhysicalPlan physicalPlan) {
+    final DAG<Stage, StageEdge> stageDAG = physicalPlan.getStageDAG();
+    stageDAG.topologicalDo(stage -> {
+      final List<String> taskIdsForStage = stage.getTaskIds();
+      final List<StageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(stage);
+
+      // Initialize states for blocks of inter-stage edges
+      stageOutgoingEdges.forEach(stageEdge -> {
+        final int srcParallelism = taskIdsForStage.size();
+        IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
+          final String blockId = RuntimeIdGenerator.generateBlockId(stageEdge.getId(), srcTaskIdx);
+          initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
+        });
+      });
+
+      // Initialize states for blocks of stage internal edges
+      taskIdsForStage.forEach(taskId -> {
+        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = stage.getIRDAG();
+        taskInternalDag.getVertices().forEach(task -> {
+          final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
+          internalOutgoingEdges.forEach(taskRuntimeEdge -> {
+            final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
+            final String blockId = RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
+            initializeState(blockId, taskId);
+          });
+        });
+      });
+    });
+  }
+
   /**
    * Initializes the states of a block which will be produced by producer task(s).
    *
@@ -132,8 +169,7 @@ public final class BlockManagerMaster {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
-      final BlockState.State state =
-          (BlockState.State) getBlockState(blockId).getStateMachine().getCurrentState();
+      final BlockState.State state = getBlockState(blockId);
       switch (state) {
         case IN_PROGRESS:
         case AVAILABLE:
@@ -174,6 +210,16 @@ public final class BlockManagerMaster {
     }
   }
 
+  public Set<String> getIdsOfBlocksProducedBy(final String taskId) {
+    final Lock readLock = lock.readLock();
+    readLock.lock();
+    try {
+      return producerTaskIdToBlockIds.get(taskId);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   /**
    * To be called when a potential producer task is scheduled.
    * @param scheduledTaskId the ID of the scheduled task.
@@ -256,11 +302,11 @@ public final class BlockManagerMaster {
    * @return the {@link BlockState} of a block.
    */
   @VisibleForTesting
-  BlockState getBlockState(final String blockId) {
+  public BlockState.State getBlockState(final String blockId) {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
-      return blockIdToMetadata.get(blockId).getBlockState();
+      return (BlockState.State) blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
     } finally {
       readLock.unlock();
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 220e585..2a0ef88 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -15,18 +15,15 @@
  */
 package edu.snu.nemo.runtime.master;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.exception.IllegalStateTransitionException;
 import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
 
@@ -45,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
-import java.util.stream.IntStream;
 
 import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
 
@@ -94,7 +90,6 @@ public final class JobStateManager {
   private final Map<String, MetricDataBuilder> metricDataBuilderMap;
 
   public JobStateManager(final PhysicalPlan physicalPlan,
-                         final BlockManagerMaster blockManagerMaster,
                          final MetricMessageHandler metricMessageHandler,
                          final int maxScheduleAttempt) {
     this.jobId = physicalPlan.getId();
@@ -109,7 +104,6 @@ public final class JobStateManager {
     this.jobFinishedCondition = finishLock.newCondition();
     this.metricDataBuilderMap = new HashMap<>();
     initializeComputationStates();
-    initializePartitionStates(blockManagerMaster);
   }
 
   /**
@@ -128,36 +122,6 @@ public final class JobStateManager {
     });
   }
 
-  private void initializePartitionStates(final BlockManagerMaster blockManagerMaster) {
-    final DAG<Stage, StageEdge> stageDAG = physicalPlan.getStageDAG();
-    stageDAG.topologicalDo(stage -> {
-      final List<String> taskIdsForStage = stage.getTaskIds();
-      final List<StageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(stage);
-
-      // Initialize states for blocks of inter-stage edges
-      stageOutgoingEdges.forEach(stageEdge -> {
-        final int srcParallelism = taskIdsForStage.size();
-        IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
-          final String blockId = RuntimeIdGenerator.generateBlockId(stageEdge.getId(), srcTaskIdx);
-          blockManagerMaster.initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
-        });
-      });
-
-      // Initialize states for blocks of stage internal edges
-      taskIdsForStage.forEach(taskId -> {
-        final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = stage.getIRDAG();
-        taskInternalDag.getVertices().forEach(task -> {
-          final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
-          internalOutgoingEdges.forEach(taskRuntimeEdge -> {
-            final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
-            final String blockId = RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
-            blockManagerMaster.initializeState(blockId, taskId);
-          });
-        });
-      });
-    });
-  }
-
   /**
    * Updates the state of a task.
    * Task state changes can occur both in master and executor.
@@ -183,8 +147,8 @@ public final class JobStateManager {
     switch (newTaskState) {
       case ON_HOLD:
       case COMPLETE:
-      case FAILED_UNRECOVERABLE:
-      case FAILED_RECOVERABLE:
+      case FAILED:
+      case SHOULD_RETRY:
         metric.put("ToState", newTaskState);
         endMeasurement(taskId, metric);
         break;
@@ -213,32 +177,33 @@ public final class JobStateManager {
         .map(this::getTaskState)
         .filter(state -> state.equals(TaskState.State.COMPLETE) || state.equals(TaskState.State.ON_HOLD))
         .count();
+    if (newTaskState.equals(TaskState.State.COMPLETE)) {
+      // Log not-yet-completed tasks for us to track progress
+      LOG.info("{} completed: {} Task(s) remaining in this stage",
+          taskId, tasksOfThisStage.size() - numOfCompletedOrOnHoldTasksInThisStage);
+    }
     switch (newTaskState) {
-      case READY:
-        onStageStateChanged(stageId, StageState.State.READY);
-        break;
-      case EXECUTING:
-        onStageStateChanged(stageId, StageState.State.EXECUTING);
-        break;
-      case FAILED_RECOVERABLE:
-        onStageStateChanged(stageId, StageState.State.FAILED_RECOVERABLE);
+      // INCOMPLETE stage
+      case SHOULD_RETRY:
+        onStageStateChanged(stageId, StageState.State.INCOMPLETE);
         break;
+
+      // COMPLETE stage
       case COMPLETE:
       case ON_HOLD:
         if (numOfCompletedOrOnHoldTasksInThisStage == tasksOfThisStage.size()) {
           onStageStateChanged(stageId, StageState.State.COMPLETE);
         }
         break;
-      case FAILED_UNRECOVERABLE:
+
+      // Doesn't affect StageState
+      case READY:
+      case EXECUTING:
+      case FAILED:
         break;
       default:
         throw new UnknownExecutionStateException(new Throwable("This task state is unknown"));
     }
-
-    // Log not-yet-completed tasks for us to track progress
-    if (newTaskState.equals(TaskState.State.COMPLETE)) {
-      LOG.info("{}: {} Task(s) to go", stageId, tasksOfThisStage.size() - numOfCompletedOrOnHoldTasksInThisStage);
-    }
   }
 
   /**
@@ -248,11 +213,6 @@ public final class JobStateManager {
    * @param newStageState of the stage.
    */
   private void onStageStateChanged(final String stageId, final StageState.State newStageState) {
-    if (newStageState.equals(getStageState(stageId))) {
-      // Ignore duplicate state updates
-      return;
-    }
-
     // Change stage state
     final StateMachine stageStateMachine = idToStageStates.get(stageId).getStateMachine();
     LOG.debug("Stage State Transition: id {} from {} to {}",
@@ -261,7 +221,7 @@ public final class JobStateManager {
 
     // Metric handling
     final Map<String, Object> metric = new HashMap<>();
-    if (newStageState == StageState.State.EXECUTING) {
+    if (newStageState == StageState.State.INCOMPLETE) {
       metric.put("FromState", newStageState);
       beginMeasurement(stageId, metric);
     } else if (newStageState == StageState.State.COMPLETE) {
@@ -269,16 +229,9 @@ public final class JobStateManager {
       endMeasurement(stageId, metric);
     }
 
-    // Change job state if needed
+    // Job becomse COMPLETE
     final boolean allStagesCompleted = idToStageStates.values().stream().allMatch(state ->
         state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
-
-    // (1) Job becomes EXECUTING if not already
-    if (newStageState.equals(StageState.State.EXECUTING)
-        && !getJobState().equals(JobState.State.EXECUTING)) {
-      onJobStateChanged(JobState.State.EXECUTING);
-    }
-    // (2) Job becomes COMPLETE
     if (allStagesCompleted) {
       onJobStateChanged(JobState.State.COMPLETE);
     }
@@ -290,20 +243,15 @@ public final class JobStateManager {
    * @param newState of the job.
    */
   private void onJobStateChanged(final JobState.State newState) {
-    if (newState.equals(getJobState())) {
-      // Ignore duplicate state updates
-      return;
-    }
-
     jobState.getStateMachine().setState(newState);
 
     final Map<String, Object> metric = new HashMap<>();
     if (newState == JobState.State.EXECUTING) {
-      LOG.debug("Executing Job ID {}...", this.jobId);
+      LOG.info("Executing Job ID {}...", this.jobId);
       metric.put("FromState", newState);
       beginMeasurement(jobId, metric);
     } else if (newState == JobState.State.COMPLETE || newState == JobState.State.FAILED) {
-      LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
+      LOG.info("Job ID {} {}!", new Object[]{jobId, newState});
 
       // Awake all threads waiting the finish of this job.
       finishLock.lock();
@@ -391,6 +339,11 @@ public final class JobStateManager {
     }
   }
 
+  @VisibleForTesting
+  public synchronized Map<String, TaskState> getAllTaskStates() {
+    return idToTaskStates;
+  }
+
   /**
    * Begins recording the start time of this metric measurement, in addition to the metric given.
    * This method ensures thread-safety by synchronizing its callers.
@@ -435,7 +388,7 @@ public final class JobStateManager {
     file.getParentFile().mkdirs();
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
-      LOG.debug(String.format("JSON representation of job state for %s(%s) was saved to %s",
+      LOG.info(String.format("JSON representation of job state for %s(%s) was saved to %s",
           jobId, suffix, file.getPath()));
     } catch (final IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of job state for %s(%s) to %s: %s",
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 4529a0b..61ffd20 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -127,8 +127,8 @@ public final class RuntimeMaster {
     final Callable<Pair<JobStateManager, ScheduledExecutorService>> jobExecutionCallable = () -> {
       this.irVertices.addAll(plan.getIdToIRVertex().values());
       try {
-        final JobStateManager jobStateManager =
-            new JobStateManager(plan, blockManagerMaster, metricMessageHandler, maxScheduleAttempt);
+        blockManagerMaster.initialize(plan);
+        final JobStateManager jobStateManager = new JobStateManager(plan, metricMessageHandler, maxScheduleAttempt);
         scheduler.scheduleJob(plan, jobStateManager);
         final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging(jobStateManager);
         return Pair.of(jobStateManager, dagLoggingExecutor);
@@ -363,9 +363,9 @@ public final class RuntimeMaster {
       case COMPLETE:
         return COMPLETE;
       case FAILED_RECOVERABLE:
-        return TaskState.State.FAILED_RECOVERABLE;
+        return TaskState.State.SHOULD_RETRY;
       case FAILED_UNRECOVERABLE:
-        return TaskState.State.FAILED_UNRECOVERABLE;
+        return TaskState.State.FAILED;
       case ON_HOLD:
         return ON_HOLD;
       default:
@@ -373,13 +373,13 @@ public final class RuntimeMaster {
     }
   }
 
-  private TaskState.RecoverableFailureCause convertFailureCause(
+  private TaskState.RecoverableTaskFailureCause convertFailureCause(
       final ControlMessage.RecoverableFailureCause cause) {
     switch (cause) {
       case InputReadFailure:
-        return TaskState.RecoverableFailureCause.INPUT_READ_FAILURE;
+        return TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE;
       case OutputWriteFailure:
-        return TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE;
+        return TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE;
       default:
         throw new UnknownFailureCauseException(
             new Throwable("The failure cause for the recoverable failure is unknown"));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 4009424..880c8d0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.collect.Sets;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
@@ -23,6 +24,7 @@ import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.common.exception.*;
@@ -39,12 +41,10 @@ import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 
-import static edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
-import static edu.snu.nemo.runtime.common.state.TaskState.State.READY;
-
 /**
  * (CONCURRENCY) Only a single dedicated thread should use the public methods of this class.
  * (i.e., runtimeMasterThread in RuntimeMaster)
@@ -74,7 +74,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    */
   private PhysicalPlan physicalPlan;
   private JobStateManager jobStateManager;
-  private int initialScheduleGroup;
+  private List<List<Stage>> sortedScheduleGroups;
 
   @Inject
   public BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
@@ -108,16 +108,19 @@ public final class BatchSingleJobScheduler implements Scheduler {
     this.physicalPlan = physicalPlanOfJob;
     this.jobStateManager = jobStateManagerOfJob;
 
-    schedulerRunner.scheduleJob(jobStateManagerOfJob);
-    schedulerRunner.runSchedulerThread();
-
-    LOG.info("Job to schedule: {}", physicalPlanOfJob.getId());
+    schedulerRunner.run(jobStateManager);
+    LOG.info("Job to schedule: {}", this.physicalPlan.getId());
 
-    this.initialScheduleGroup = physicalPlanOfJob.getStageDAG().getVertices().stream()
-        .mapToInt(stage -> stage.getScheduleGroup())
-        .min().getAsInt();
+    this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices()
+        .stream()
+        .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+        .entrySet()
+        .stream()
+        .sorted(Map.Entry.comparingByKey())
+        .map(Map.Entry::getValue)
+        .collect(Collectors.toList());
 
-    scheduleNextScheduleGroup(initialScheduleGroup);
+    doSchedule();
   }
 
   @Override
@@ -127,6 +130,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     this.physicalPlan = newPhysicalPlan;
     if (taskInfo != null) {
       onTaskExecutionComplete(taskInfo.left(), taskInfo.right(), true);
+      doSchedule();
     }
   }
 
@@ -147,7 +151,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
                                             final int taskAttemptIndex,
                                             final TaskState.State newState,
                                             @Nullable final String vertexPutOnHold,
-                                            final TaskState.RecoverableFailureCause failureCause) {
+                                            final TaskState.RecoverableTaskFailureCause failureCause) {
     final int currentTaskAttemptIndex = jobStateManager.getTaskAttempt(taskId);
 
     if (taskAttemptIndex == currentTaskAttemptIndex) {
@@ -155,15 +159,16 @@ public final class BatchSingleJobScheduler implements Scheduler {
       jobStateManager.onTaskStateChanged(taskId, newState);
       switch (newState) {
         case COMPLETE:
-          onTaskExecutionComplete(executorId, taskId);
+          onTaskExecutionComplete(executorId, taskId, false);
           break;
-        case FAILED_RECOVERABLE:
+        case SHOULD_RETRY:
+          // SHOULD_RETRY from an executor means that the task ran into a recoverable failure
           onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
           break;
         case ON_HOLD:
           onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
           break;
-        case FAILED_UNRECOVERABLE:
+        case FAILED:
           throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on Task #")
               .append(taskId).append(" in Executor ").append(executorId).toString()));
         case READY:
@@ -173,11 +178,43 @@ public final class BatchSingleJobScheduler implements Scheduler {
         default:
           throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
       }
+
+      // Invoke doSchedule()
+      switch (newState) {
+        case COMPLETE:
+        case ON_HOLD:
+          // If the stage has completed
+          final String stageIdForTaskUponCompletion = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
+          if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
+            if (!jobStateManager.isJobDone()) {
+              doSchedule();
+            }
+          }
+          break;
+        case SHOULD_RETRY:
+          // Retry the failed task
+          doSchedule();
+          break;
+        default:
+          break;
+      }
+
+      // Invoke schedulerRunner.onExecutorSlotAvailable()
+      switch (newState) {
+        // These three states mean that a slot is made available.
+        case COMPLETE:
+        case ON_HOLD:
+        case SHOULD_RETRY:
+          schedulerRunner.onExecutorSlotAvailable();
+          break;
+        default:
+          break;
+      }
     } else if (taskAttemptIndex < currentTaskAttemptIndex) {
-      // Do not change state, as this notification is for a previous task attempt.
+      // Do not change state, as this report is from a previous task attempt.
       // For example, the master can receive a notification that an executor has been removed,
       // and then a notification that the task that was running in the removed executor has been completed.
-      // In this case, if we do not consider the attempt number, the state changes from FAILED_RECOVERABLE to COMPLETED,
+      // In this case, if we do not consider the attempt number, the state changes from SHOULD_RETRY to COMPLETED,
       // which is illegal.
       LOG.info("{} state change to {} arrived late, we will ignore this.", new Object[]{taskId, newState});
     } else {
@@ -187,35 +224,33 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+    LOG.info("{} added", executorRepresenter.getExecutorId());
     executorRegistry.registerExecutor(executorRepresenter);
-    schedulerRunner.onAnExecutorAvailable();
+    schedulerRunner.onExecutorSlotAvailable();
   }
 
   @Override
   public void onExecutorRemoved(final String executorId) {
-    final Set<String> tasksToReExecute = new HashSet<>();
-    // Tasks for lost blocks
-    tasksToReExecute.addAll(blockManagerMaster.removeWorker(executorId));
+    blockManagerMaster.removeWorker(executorId);
 
-    // Tasks executing on the removed executor
+    // These are tasks that were running at the time of executor removal.
+    final Set<String> interruptedTasks = new HashSet<>();
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
-      tasksToReExecute.addAll(executor.onExecutorFailed());
+      interruptedTasks.addAll(executor.onExecutorFailed());
       return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
     });
 
-    tasksToReExecute.forEach(failedTaskId -> {
-      final int attemptIndex = jobStateManager.getTaskAttempt(failedTaskId);
-      onTaskStateReportFromExecutor(executorId, failedTaskId, attemptIndex, TaskState.State.FAILED_RECOVERABLE,
-          null, TaskState.RecoverableFailureCause.CONTAINER_FAILURE);
-    });
+    // We need to retry the interrupted tasks, and also recover the tasks' missing input blocks if needed.
+    final Set<String> tasksToReExecute =
+        Sets.union(interruptedTasks, recursivelyGetParentTasksForLostBlocks(interruptedTasks));
 
-    if (!tasksToReExecute.isEmpty()) {
-      // Schedule a stage after marking the necessary tasks to failed_recoverable.
-      // The stage for one of the tasks that failed is a starting point to look
-      // for the next stage to be scheduled.
-      scheduleNextScheduleGroup(getScheduleGroupOfStage(
-          RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next())));
-    }
+    // Report SHOULD_RETRY tasks so they can be re-scheduled
+    LOG.info("{} removed: {} will be retried", executorId, tasksToReExecute);
+    tasksToReExecute.forEach(
+        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+
+    // Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
+    doSchedule();
   }
 
   @Override
@@ -224,108 +259,45 @@ public final class BatchSingleJobScheduler implements Scheduler {
     this.executorRegistry.terminate();
   }
 
+  ////////////////////////////////////////////////////////////////////// Key methods for scheduling
+
   /**
-   * Schedules the next schedule group to execute.
-   * @param referenceIndex of the schedule group.
+   * The main entry point for task scheduling.
+   * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects,
+   * and integrate well with {@link PendingTaskCollectionPointer} and {@link SchedulerRunner}.
    */
-  private void scheduleNextScheduleGroup(final int referenceIndex) {
-    final Optional<List<Stage>> nextScheduleGroupToSchedule = selectNextScheduleGroupToSchedule(referenceIndex);
+  private void doSchedule() {
+    final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
 
-    if (nextScheduleGroupToSchedule.isPresent()) {
-      LOG.info("Scheduling: ScheduleGroup {}", nextScheduleGroupToSchedule.get());
-      final List<Task> tasksToSchedule = nextScheduleGroupToSchedule.get().stream()
-          .flatMap(stage -> getSchedulableTasks(stage).stream())
+    if (earliest.isPresent()) {
+      // Get schedulable tasks.
+      final List<Task> tasksToSchedule = earliest.get().stream()
+          .flatMap(stage -> selectSchedulableTasks(stage).stream())
           .collect(Collectors.toList());
+
+      LOG.info("Attempting to schedule {} in the same ScheduleGroup",
+          tasksToSchedule.stream().map(Task::getTaskId).collect(Collectors.toList()));
+
+      // Set the pointer to the schedulable tasks.
       pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+
+      // Notify the runner that a new collection is available.
       schedulerRunner.onNewPendingTaskCollectionAvailable();
     } else {
-      LOG.info("Skipping this round as the next schedulable stages have already been scheduled.");
+      LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
     }
   }
 
-  /**
-   * Selects the next stage to schedule.
-   * It takes the referenceScheduleGroup as a reference point to begin looking for the stages to execute:
-   *
-   * a) returns the failed_recoverable stage(s) of the earliest schedule group, if it(they) exists.
-   * b) returns an empty optional if there are no schedulable stages at the moment.
-   *    - if the current schedule group is still executing
-   *    - if an ancestor schedule group is still executing
-   * c) returns the next set of schedulable stages (if the current schedule group has completed execution)
-   *
-   * @param referenceScheduleGroup
-   *      the index of the schedule group that is executing/has executed when this method is called.
-   * @return an optional of the (possibly empty) next schedulable stage
-   */
-  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroup) {
-    // Recursively check the previous schedule group.
-    if (referenceScheduleGroup > initialScheduleGroup) {
-      final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
-          selectNextScheduleGroupToSchedule(referenceScheduleGroup - 1);
-      if (ancestorStagesFromAScheduleGroup.isPresent()) {
-        // Nothing to schedule from the previous schedule group.
-        return ancestorStagesFromAScheduleGroup;
-      }
-    }
-
-    // Return the schedulable stage list in reverse-topological order
-    // since the stages that belong to the same schedule group are mutually independent,
-    // or connected by a "push" edge, where scheduling the children stages first is preferred.
-    final List<Stage> reverseTopoStages = physicalPlan.getStageDAG().getTopologicalSort();
-    Collections.reverse(reverseTopoStages);
-
-    // All previous schedule groups are complete, we need to check for the current schedule group.
-    final List<Stage> currentScheduleGroup = reverseTopoStages
-        .stream()
-        .filter(stage -> stage.getScheduleGroup() == referenceScheduleGroup)
-        .collect(Collectors.toList());
-    final boolean allStagesOfThisGroupComplete = currentScheduleGroup
-        .stream()
-        .map(Stage::getId)
-        .map(jobStateManager::getStageState)
-        .allMatch(state -> state.equals(StageState.State.COMPLETE));
-
-    if (!allStagesOfThisGroupComplete) {
-      LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroup);
-      final List<Stage> stagesToSchedule = currentScheduleGroup
-          .stream()
-          .filter(stage -> {
-            final StageState.State stageState = jobStateManager.getStageState(stage.getId());
-            return stageState.equals(StageState.State.FAILED_RECOVERABLE)
-                || stageState.equals(StageState.State.READY);
-          })
-          .collect(Collectors.toList());
-      return (stagesToSchedule.isEmpty())
-          ? Optional.empty()
-          : Optional.of(stagesToSchedule);
-    } else {
-      // By the time the control flow has reached here,
-      // we are ready to move onto the next ScheduleGroup
-      final List<Stage> stagesToSchedule = reverseTopoStages
-          .stream()
-          .filter(stage -> {
-            if (stage.getScheduleGroup() == referenceScheduleGroup + 1) {
-              final String stageId = stage.getId();
-              return jobStateManager.getStageState(stageId) != StageState.State.EXECUTING
-                  && jobStateManager.getStageState(stageId) != StageState.State.COMPLETE;
-            }
-            return false;
-          })
-          .collect(Collectors.toList());
-
-      if (stagesToSchedule.isEmpty()) {
-        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroup + 1);
-        return Optional.empty();
-      }
-
-      return Optional.of(stagesToSchedule);
-    }
+  private Optional<List<Stage>> selectEarliestSchedulableGroup() {
+    return sortedScheduleGroups.stream()
+        .filter(scheduleGroup -> scheduleGroup.stream()
+            .map(Stage::getId)
+            .map(jobStateManager::getStageState)
+            .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
+        .findFirst(); // selects the one with the smallest scheduling group index.
   }
 
-  /**
-   * @param stageToSchedule the stage to schedule.
-   */
-  private List<Task> getSchedulableTasks(final Stage stageToSchedule) {
+  private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
     final List<StageEdge> stageIncomingEdges =
         physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<StageEdge> stageOutgoingEdges =
@@ -333,39 +305,35 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
     final List<String> taskIdsToSchedule = new LinkedList<>();
     for (final String taskId : stageToSchedule.getTaskIds()) {
-      // this happens when the belonging stage's other tasks have failed recoverable,
-      // but this task's results are safe.
       final TaskState.State taskState = jobStateManager.getTaskState(taskId);
 
       switch (taskState) {
+        // Don't schedule these.
         case COMPLETE:
         case EXECUTING:
-          LOG.info("Skipping {} because its outputs are safe!", taskId);
+        case ON_HOLD:
           break;
-        case FAILED_RECOVERABLE:
-          jobStateManager.onTaskStateChanged(taskId, READY);
+
+        // These are schedulable.
+        case SHOULD_RETRY:
+          jobStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
         case READY:
           taskIdsToSchedule.add(taskId);
           break;
-        case ON_HOLD:
-          // Do nothing
-          break;
+
+        // This shouldn't happen.
         default:
-          throw new SchedulingException(new Throwable("Detected a FAILED_UNRECOVERABLE Task"));
+          throw new SchedulingException(new Throwable("Detected a FAILED Task"));
       }
     }
 
-    LOG.info("Scheduling Stage {}", stageToSchedule.getId());
-
-    // each readable and source task will be bounded in executor.
+    // Create and return tasks.
     final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
-
     final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
-      blockManagerMaster.onProducerTaskScheduled(taskId);
+      blockManagerMaster.onProducerTaskScheduled(taskId); // Notify the block manager early for push edges.
       final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
       final int attemptIdx = jobStateManager.getTaskAttempt(taskId);
-
       tasks.add(new Task(
           physicalPlan.getId(),
           taskId,
@@ -379,38 +347,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     return tasks;
   }
 
-  /**
-   * @param taskId id of the task
-   * @return the IR dag
-   */
-  private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
-    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
-      if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return stage.getIRDAG();
-      }
-    }
-    throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
-  }
-
-  private Stage getStageById(final String stageId) {
-    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
-      if (stage.getId().equals(stageId)) {
-        return stage;
-      }
-    }
-    throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
-  }
-
-  /**
-   * Action after task execution has been completed, not after it has been put on hold.
-   *
-   * @param executorId  the ID of the executor.
-   * @param taskId the ID pf the task completed.
-   */
-  private void onTaskExecutionComplete(final String executorId,
-                                       final String taskId) {
-    onTaskExecutionComplete(executorId, taskId, false);
-  }
+  ////////////////////////////////////////////////////////////////////// Task state change handlers
 
   /**
    * Action after task execution has been completed.
@@ -420,7 +357,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    */
   private void onTaskExecutionComplete(final String executorId,
                                        final String taskId,
-                                       final Boolean isOnHoldToComplete) {
+                                       final boolean isOnHoldToComplete) {
     LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
     if (!isOnHoldToComplete) {
       executorRegistry.updateExecutor(executorId, (executor, state) -> {
@@ -428,15 +365,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
         return Pair.of(executor, state);
       });
     }
-
-    final String stageIdForTaskUponCompletion = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
-    if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
-      // if the stage this task belongs to is complete,
-      if (!jobStateManager.isJobDone()) {
-        scheduleNextScheduleGroup(getScheduleGroupOfStage(stageIdForTaskUponCompletion));
-      }
-    }
-    schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
@@ -466,7 +394,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
               .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
               .distinct()
               .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
-              .findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it
+              .findFirst().orElseThrow(() -> new RuntimeException(TaskState.State.ON_HOLD.name() // get it
               + " called with failed task ids by some other task than "
               + MetricCollectionBarrierVertex.class.getSimpleName()));
       // and we will use this vertex to perform metric collection and dynamic optimization.
@@ -476,7 +404,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
     } else {
       onTaskExecutionComplete(executorId, taskId, true);
     }
-    schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
@@ -487,33 +414,75 @@ public final class BatchSingleJobScheduler implements Scheduler {
    */
   private void onTaskExecutionFailedRecoverable(final String executorId,
                                                 final String taskId,
-                                                final TaskState.RecoverableFailureCause failureCause) {
+                                                final TaskState.RecoverableTaskFailureCause failureCause) {
     LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
       executor.onTaskExecutionFailed(taskId);
       return Pair.of(executor, state);
     });
 
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
-
     switch (failureCause) {
       // Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
       case INPUT_READ_FAILURE:
-        // TODO #50: Carefully retry tasks in the scheduler
+        // TODO #54: Handle remote data fetch failures
       case OUTPUT_WRITE_FAILURE:
         blockManagerMaster.onProducerTaskFailed(taskId);
-        scheduleNextScheduleGroup(getScheduleGroupOfStage(stageId));
-        break;
-      case CONTAINER_FAILURE:
-        LOG.info("Only the failed task will be retried.");
         break;
       default:
         throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
     }
-    schedulerRunner.onAnExecutorAvailable();
   }
 
-  private int getScheduleGroupOfStage(final String stageId) {
-    return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroup();
+  ////////////////////////////////////////////////////////////////////// Helper methods
+
+  private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
+    if (children.isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    final Set<String> selectedParentTasks = children.stream()
+        .flatMap(child -> getParentTasks(child).stream())
+        .filter(parent -> blockManagerMaster.getIdsOfBlocksProducedBy(parent).stream()
+            .map(blockManagerMaster::getBlockState)
+            .anyMatch(blockState -> blockState.equals(BlockState.State.NOT_AVAILABLE)) // If a block is missing
+        )
+        .collect(Collectors.toSet());
+
+    // Recursive call
+    return Sets.union(selectedParentTasks, recursivelyGetParentTasksForLostBlocks(selectedParentTasks));
+  }
+
+  private Set<String> getParentTasks(final String childTaskId) {
+    final String stageIdOfChildTask = RuntimeIdGenerator.getStageIdFromTaskId(childTaskId);
+    return physicalPlan.getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
+        .stream()
+        .flatMap(inStageEdge -> {
+          final List<String> tasksOfParentStage = inStageEdge.getSrc().getTaskIds();
+          switch (inStageEdge.getDataCommunicationPattern()) {
+            case Shuffle:
+            case BroadCast:
+              // All of the parent stage's tasks are parents
+              return tasksOfParentStage.stream();
+            case OneToOne:
+              // Only one of the parent stage's tasks is a parent
+              return Stream.of(tasksOfParentStage.get(RuntimeIdGenerator.getIndexFromTaskId(childTaskId)));
+            default:
+              throw new IllegalStateException(inStageEdge.toString());
+          }
+        })
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * @param taskId id of the task
+   * @return the IR dag
+   */
+  private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
+    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+      if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
+        return stage.getIRDAG();
+      }
+    }
+    throw new RuntimeException("This taskId does not exist in the plan");
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 12114a7..28b52db 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -79,7 +79,7 @@ public interface Scheduler {
                                      int attemptIdx,
                                      TaskState.State newState,
                                      @Nullable String taskPutOnHold,
-                                     TaskState.RecoverableFailureCause failureCause);
+                                     TaskState.RecoverableTaskFailureCause failureCause);
 
   /**
    * To be called when a job should be terminated.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 42e9a2c..caf0d40 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -148,9 +148,9 @@ public final class SchedulerRunner {
   }
 
   /**
-   * Signals to the condition on executor availability.
+   * Signals to the condition on executor slot availability.
    */
-  void onAnExecutorAvailable() {
+  void onExecutorSlotAvailable() {
     schedulingIteration.signal();
   }
 
@@ -164,24 +164,15 @@ public final class SchedulerRunner {
   /**
    * Run the scheduler thread.
    */
-  void runSchedulerThread() {
+  void run(final JobStateManager jobStateManager) {
     if (!isTerminated && !isSchedulerRunning) {
+      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
       schedulerThread.execute(new SchedulerThread());
       schedulerThread.shutdown();
       isSchedulerRunning = true;
     }
   }
 
-  /**
-   * Begin scheduling a job.
-   * @param jobStateManager the corresponding {@link JobStateManager}
-   */
-  void scheduleJob(final JobStateManager jobStateManager) {
-    if (!isTerminated) {
-      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
-    } // else ignore new incoming jobs when terminated.
-  }
-
   void terminate() {
     isTerminated = true;
     schedulingIteration.signal();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index 4aa2b0a..6860893 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -76,7 +76,7 @@ public final class JobStateManagerTest {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
     final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+        new JobStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertEquals(jobStateManager.getJobId(), "TestPlan");
 
@@ -108,7 +108,7 @@ public final class JobStateManagerTest {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
     final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+        new JobStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertFalse(jobStateManager.isJobDone());
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index 8bfe43d..e4491b6 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -148,24 +148,17 @@ public final class BatchSingleJobSchedulerTest {
   }
 
   private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws InjectionException {
-    final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 1);
+    final JobStateManager jobStateManager = new JobStateManager(plan, metricMessageHandler, 1);
     scheduler.scheduleJob(plan, jobStateManager);
 
-    // For each ScheduleGroup, test:
-    // a) all stages in the ScheduleGroup enters the executing state
-    // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
+    // For each ScheduleGroup, test if the tasks of the next ScheduleGroup are scheduled
+    // after the stages of each ScheduleGroup are made "complete".
     for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
       final List<Stage> stages = filterStagesWithAScheduleGroup(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
       stages.forEach(stage -> {
-        while (jobStateManager.getStageState(stage.getId()) != StageState.State.EXECUTING) {
-
-        }
-      });
-
-      stages.forEach(stage -> {
         SchedulerTestUtil.completeStage(
             jobStateManager, scheduler, executorRegistry, stage, SCHEDULE_ATTEMPT_INDEX);
       });
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 387c6b9..815dff8 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -45,7 +45,7 @@ final class SchedulerTestUtil {
       if (StageState.State.COMPLETE == stageState) {
         // Stage has completed, so we break out of the loop.
         break;
-      } else if (StageState.State.EXECUTING == stageState) {
+      } else if (StageState.State.INCOMPLETE == stageState) {
         stage.getTaskIds().forEach(taskId -> {
           final TaskState.State taskState = jobStateManager.getTaskState(taskId);
           if (TaskState.State.EXECUTING == taskState) {
@@ -57,8 +57,6 @@ final class SchedulerTestUtil {
             throw new IllegalStateException(taskState.toString());
           }
         });
-      } else if (StageState.State.READY == stageState) {
-        // Skip and retry in the next loop.
       } else {
         throw new IllegalStateException(stageState.toString());
       }
@@ -79,7 +77,7 @@ final class SchedulerTestUtil {
                                             final String taskId,
                                             final TaskState.State newState,
                                             final int attemptIdx,
-                                            final TaskState.RecoverableFailureCause cause) {
+                                            final TaskState.RecoverableTaskFailureCause cause) {
     final ExecutorRepresenter scheduledExecutor;
     while (true) {
       final Optional<ExecutorRepresenter> optional = executorRegistry.findExecutorForTask(taskId);
@@ -91,4 +89,12 @@ final class SchedulerTestUtil {
     scheduler.onTaskStateReportFromExecutor(scheduledExecutor.getExecutorId(), taskId, attemptIdx,
         newState, null, cause);
   }
+
+  static void sendTaskStateEventToScheduler(final Scheduler scheduler,
+                                            final ExecutorRegistry executorRegistry,
+                                            final String taskId,
+                                            final TaskState.State newState,
+                                            final int attemptIdx) {
+    sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, newState, attemptIdx, null);
+  }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
new file mode 100644
index 0000000..7064a23
--- /dev/null
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageSender;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.MetricMessageHandler;
+import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
+import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests fault tolerance.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, SchedulingConstraintRegistry.class,
+    PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
+public final class TaskRestartTest {
+  @Rule public TestName testName = new TestName();
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskRestartTest.class.getName());
+  private static final AtomicInteger ID_OFFSET = new AtomicInteger(1);
+
+  private Random random;
+  private Scheduler scheduler;
+  private ExecutorRegistry executorRegistry;
+  private JobStateManager jobStateManager;
+
+  private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
+
+  @Before
+  public void setUp() throws Exception {
+    // To understand which part of the log belongs to which test
+    LOG.info("===== Testing {} =====", testName.getMethodName());
+    final Injector injector = Tang.Factory.getTang().newInjector();
+
+    // Get random
+    random = new Random(0); // Fixed seed for reproducing test results.
+
+    // Get executorRegistry
+    executorRegistry = injector.getInstance(ExecutorRegistry.class);
+
+    // Get scheduler
+    final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
+    final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
+    final SchedulingConstraintRegistry constraintRegistry = mock(SchedulingConstraintRegistry.class);
+    final SchedulingPolicy schedulingPolicy = injector.getInstance(MinOccupancyFirstSchedulingPolicy.class);
+    final PendingTaskCollectionPointer pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
+    final SchedulerRunner schedulerRunner = new SchedulerRunner(
+        constraintRegistry, schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
+    final BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
+    scheduler =  new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollectionPointer, blockManagerMaster,
+        pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
+
+    // Get JobStateManager
+    jobStateManager = runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
+  }
+
+  @Test(timeout=7000)
+  public void testExecutorRemoved() throws Exception {
+    // Until the job finishes, events happen
+    while (!jobStateManager.isJobDone()) {
+      // 50% chance remove, 50% chance add, 80% chance task completed
+      executorRemoved(0.5);
+      executorAdded(0.5);
+      taskCompleted(0.8);
+
+      // 10ms sleep
+      Thread.sleep(10);
+    }
+
+    // Job should COMPLETE
+    assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
+    assertTrue(jobStateManager.isJobDone());
+  }
+
+  @Test(timeout=7000)
+  public void testTaskOutputWriteFailure() throws Exception {
+    // Three executors are used
+    executorAdded(1.0);
+    executorAdded(1.0);
+    executorAdded(1.0);
+
+    // Until the job finishes, events happen
+    while (!jobStateManager.isJobDone()) {
+      // 50% chance task completed
+      // 50% chance task output write failed
+      taskCompleted(0.5);
+      taskOutputWriteFailed(0.5);
+
+      // 10ms sleep
+      Thread.sleep(10);
+    }
+
+    // Job should COMPLETE
+    assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
+    assertTrue(jobStateManager.isJobDone());
+  }
+
+  ////////////////////////////////////////////////////////////////// Events
+
+  private void executorAdded(final double chance) {
+    if (random.nextDouble() > chance) {
+      return;
+    }
+
+    final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
+    final ActiveContext activeContext = mock(ActiveContext.class);
+    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
+    final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
+    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
+    final ExecutorRepresenter executor = new ExecutorRepresenter("EXECUTOR" + ID_OFFSET.getAndIncrement(),
+        computeSpec, mockMsgSender, activeContext, serExecutorService, "NODE" + ID_OFFSET.getAndIncrement());
+    scheduler.onExecutorAdded(executor);
+  }
+
+  private void executorRemoved(final double chance) {
+    if (random.nextDouble() > chance) {
+      return;
+    }
+
+    executorRegistry.viewExecutors(executors -> {
+      if (executors.isEmpty()) {
+        return;
+      }
+
+      final List<ExecutorRepresenter> executorList = new ArrayList<>(executors);
+      final int randomIndex = random.nextInt(executorList.size());
+
+      // Because synchronized blocks are reentrant and there's no additional operation after this point,
+      // we can scheduler.onExecutorRemoved() while being inside executorRegistry.viewExecutors()
+      scheduler.onExecutorRemoved(executorList.get(randomIndex).getExecutorId());
+    });
+  }
+
+  private void taskCompleted(final double chance) {
+    if (random.nextDouble() > chance) {
+      return;
+    }
+
+    final List<String> executingTasks = getTasksInState(jobStateManager, TaskState.State.EXECUTING);
+    if (!executingTasks.isEmpty()) {
+      final int randomIndex = random.nextInt(executingTasks.size());
+      final String selectedTask = executingTasks.get(randomIndex);
+      SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
+          TaskState.State.COMPLETE, jobStateManager.getTaskAttempt(selectedTask));
+    }
+  }
+
+  private void taskOutputWriteFailed(final double chance) {
+    if (random.nextDouble() > chance) {
+      return;
+    }
+
+    final List<String> executingTasks = getTasksInState(jobStateManager, TaskState.State.EXECUTING);
+    if (!executingTasks.isEmpty()) {
+      final int randomIndex = random.nextInt(executingTasks.size());
+      final String selectedTask = executingTasks.get(randomIndex);
+      SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
+          TaskState.State.SHOULD_RETRY, jobStateManager.getTaskAttempt(selectedTask),
+          TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE);
+    }
+  }
+
+  ////////////////////////////////////////////////////////////////// Helper methods
+
+  private List<String> getTasksInState(final JobStateManager jobStateManager, final TaskState.State state) {
+    return jobStateManager.getAllTaskStates().entrySet().stream()
+        .filter(entry -> entry.getValue().getStateMachine().getCurrentState().equals(state))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  private JobStateManager runPhysicalPlan(final TestPlanGenerator.PlanType planType) throws Exception {
+    final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
+    final PhysicalPlan plan = TestPlanGenerator.generatePhysicalPlan(planType, false);
+    final JobStateManager jobStateManager = new JobStateManager(plan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    scheduler.scheduleJob(plan, jobStateManager);
+    return jobStateManager;
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index c962ad3..adce9e6 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -81,9 +81,8 @@ public class ClientEndpointTest {
         new LocalMessageEnvironment(MessageEnvironment.MASTER_COMMUNICATION_ID, messageDispatcher);
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
-    final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
     final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+        new JobStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);