You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by sa...@apache.org on 2018/08/22 03:40:37 UTC

[incubator-nemo] branch master updated: [NEMO-179] Delayed Task Cloning (#112)

This is an automated email from the ASF dual-hosted git repository.

sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 25bea60  [NEMO-179] Delayed Task Cloning (#112)
25bea60 is described below

commit 25bea60cbfcce37d20ccc9d669b0004ef4ef6c12
Author: John Yang <jo...@gmail.com>
AuthorDate: Wed Aug 22 12:40:34 2018 +0900

    [NEMO-179] Delayed Task Cloning (#112)
    
    JIRA: [NEMO-179: Delayed Task Cloning](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-179)
    
    **Major changes:**
    - Two options for ClonedSchedulingProperty: Speculative execution(fraction, medianTimeMultiplier) + Upfront cloning
    - ScheduledExecutorService in RuntimeMaster: Check for speculative execution every 100ms
    - Data structures to identify stragglers and clone tasks in PlanStateManager
    - BlockState: No longer transitions from NOT_AVAILABLE to other states (similar to TaskState), assuming that a new Block attempt takes over the failed one
    
    **Minor changes to note:**
    - Fix a bug in SourceLocationAwareSchedulingConstraint, as its getIntermediateDataLocation is no longer deterministic (randomly fetches one of the clone outputs)
    - Log when the speculative execution is triggered
    - Indentations
    
    **Tests for the changes:**
    - Unit test(TaskRetryTest): Tests combinations of Task retry + Task speculation (cloning), both of which lead to new task attempts
    - Integration test(WordCountITCase#testSpeculativeExecution): In addition to the existing test for upfront cloning, this tests conditional (very aggressive) speculative cloning.
    
    **Other comments:**
    - N/A
    
    Closes #112
---
 .../ClonedSchedulingProperty.java                  |  88 +++++++-
 ....java => AggressiveSpeculativeCloningPass.java} |  19 +-
 ...SchedulingPass.java => UpfrontCloningPass.java} |  24 ++-
 .../compiler/optimizer/policy/BasicPullPolicy.java |   3 +
 .../compiler/optimizer/policy/BasicPushPolicy.java |   9 +-
 .../snu/nemo/examples/beam/WordCountITCase.java    |  12 +-
 ...veSpeculativeCloningPolicyParallelismFive.java} |  11 +-
 ...=> UpfrontSchedulingPolicyParallelismFive.java} |  10 +-
 .../snu/nemo/runtime/common/state/BlockState.java  |  10 +-
 .../snu/nemo/runtime/common/state/StageState.java  |   2 +
 .../nemo/runtime/master/BlockManagerMaster.java    |  76 ++++---
 .../edu/snu/nemo/runtime/master/BlockMetadata.java |  15 +-
 .../snu/nemo/runtime/master/PlanStateManager.java  | 204 ++++++++++++------
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  18 +-
 .../runtime/master/scheduler/BatchScheduler.java   | 228 ++++++++++++++-------
 ...aint.java => LocalitySchedulingConstraint.java} |  71 ++++---
 .../nemo/runtime/master/scheduler/Scheduler.java   |   5 +
 .../scheduler/SchedulingConstraintRegistry.java    |   4 +-
 .../runtime/master/BlockManagerMasterTest.java     |  45 ++--
 ....java => LocalitySchedulingConstraintTest.java} |  14 +-
 .../SchedulingConstraintnRegistryTest.java         |   2 +-
 .../runtime/master/scheduler/TaskRetryTest.java    |  56 +++--
 22 files changed, 636 insertions(+), 290 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
index cd0f312..6a24c6d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
@@ -17,30 +17,104 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
 
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
+import java.io.Serializable;
+
 /**
  * Specifies cloned execution of a vertex.
  *
  * A major limitations of the current implementation:
  * *ALL* of the clones are always scheduled immediately
  */
-public final class ClonedSchedulingProperty extends VertexExecutionProperty<Integer> {
+public final class ClonedSchedulingProperty extends VertexExecutionProperty<ClonedSchedulingProperty.CloneConf> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private ClonedSchedulingProperty(final Integer value) {
+  private ClonedSchedulingProperty(final CloneConf value) {
     super(value);
   }
 
   /**
    * Static method exposing the constructor.
-   * @param value value of the new execution property.
+   * @param conf value of the new execution property.
    * @return the newly created execution property.
    */
-  public static ClonedSchedulingProperty of(final Integer value) {
-    if (value <= 0) {
-      throw new IllegalStateException(String.valueOf(value));
+  public static ClonedSchedulingProperty of(final CloneConf conf) {
+    return new ClonedSchedulingProperty(conf);
+  }
+
+  /**
+   * Configurations for cloning.
+   * TODO #199: Slot-aware cloning
+   */
+  public static final class CloneConf implements Serializable {
+    // Always clone, upfront.
+    private final boolean upFrontCloning;
+
+    // Fraction of tasks to wait for completion, before trying to clone.
+    // If this value is 0, then we always clone.
+    private final double fractionToWaitFor;
+
+    // How many times slower is a task than the median, in order to be cloned.
+    private final double medianTimeMultiplier;
+
+    /**
+     * Always clone, upfront.
+     */
+    public CloneConf() {
+      this.upFrontCloning = true;
+      this.fractionToWaitFor = 0.0;
+      this.medianTimeMultiplier = 0.0;
+    }
+
+    /**
+     * Clone stragglers judiciously.
+     * @param fractionToWaitFor before trying to clone.
+     * @param medianTimeMultiplier to identify stragglers.
+     */
+    public CloneConf(final double fractionToWaitFor, final double medianTimeMultiplier) {
+      if (fractionToWaitFor >= 1.0 || fractionToWaitFor <= 0) {
+        throw new IllegalArgumentException(String.valueOf(fractionToWaitFor));
+      }
+      if (medianTimeMultiplier < 1.0) {
+        throw new IllegalArgumentException(String.valueOf(medianTimeMultiplier));
+      }
+      this.upFrontCloning = false;
+      this.fractionToWaitFor = fractionToWaitFor;
+      this.medianTimeMultiplier = medianTimeMultiplier;
+    }
+
+    /**
+     * @return fractionToWaitFor.
+     */
+    public double getFractionToWaitFor() {
+      return fractionToWaitFor;
+    }
+
+    /**
+     * @return medianTimeMultiplier.
+     */
+    public double getMedianTimeMultiplier() {
+      return medianTimeMultiplier;
+    }
+
+    /**
+     * @return true if it is upfront cloning.
+     */
+    public boolean isUpFrontCloning() {
+      return upFrontCloning;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("upfront: ");
+      sb.append(upFrontCloning);
+      sb.append(" / fraction: ");
+      sb.append(fractionToWaitFor);
+      sb.append(" / multiplier: ");
+      sb.append(medianTimeMultiplier);
+      return sb.toString();
     }
-    return new ClonedSchedulingProperty(value);
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
similarity index 61%
copy from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
copy to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
index 32797eb..7897183 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
@@ -21,22 +21,27 @@ import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
 
 /**
- * Set the ClonedScheduling property of source vertices.
+ * Speculative execution. (very aggressive, for unit tests)
+ * TODO #200: Maintain Test Passes and Policies Separately
  */
 @Annotates(ClonedSchedulingProperty.class)
-public final class ClonedSchedulingPass extends AnnotatingPass {
+public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
   /**
    * Default constructor.
    */
-  public ClonedSchedulingPass() {
-    super(ClonedSchedulingPass.class);
+  public AggressiveSpeculativeCloningPass() {
+    super(AggressiveSpeculativeCloningPass.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.getVertices().stream()
-        .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId()).isEmpty())
-        .forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(2)));
+    // Speculative execution policy.
+    final double fractionToWaitFor = 0.00000001; // Aggressive
+    final double medianTimeMultiplier = 1.00000001; // Aggressive
+
+    // Apply the policy to ALL vertices
+    dag.getVertices().forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(
+      new ClonedSchedulingProperty.CloneConf(fractionToWaitFor, medianTimeMultiplier))));
     return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
similarity index 54%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index 32797eb..7c4920f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -17,26 +17,38 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 /**
- * Set the ClonedScheduling property of source vertices.
+ * Set the ClonedScheduling property of source vertices, in an upfront manner.
  */
 @Annotates(ClonedSchedulingProperty.class)
-public final class ClonedSchedulingPass extends AnnotatingPass {
+@Requires(CommunicationPatternProperty.class)
+public final class UpfrontCloningPass extends AnnotatingPass {
   /**
    * Default constructor.
    */
-  public ClonedSchedulingPass() {
-    super(ClonedSchedulingPass.class);
+  public UpfrontCloningPass() {
+    super(UpfrontCloningPass.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.getVertices().stream()
-        .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId()).isEmpty())
-        .forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(2)));
+        .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
+          .stream()
+          // TODO #198: Handle Un-cloneable Beam Sink Operators
+          // only shuffle receivers (for now... as particular Beam sink operators fail when cloned)
+          .anyMatch(edge ->
+            edge.getPropertyValue(CommunicationPatternProperty.class)
+              .orElseThrow(() -> new IllegalStateException())
+              .equals(CommunicationPatternProperty.Value.Shuffle))
+          )
+        .forEach(vertex -> vertex.setProperty(
+          ClonedSchedulingProperty.of(new ClonedSchedulingProperty.CloneConf()))); // clone upfront, always
     return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 192ba26..b4e17d1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -20,14 +20,17 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
 import org.apache.reef.tang.Injector;
 
 /**
  * Basic pull policy.
+ * TODO #200: Maintain Test Passes and Policies Separately
  */
 public final class BasicPullPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
       new PolicyBuilder()
+          .registerCompileTimePass(new AggressiveSpeculativeCloningPass())
           .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index c0c52ff..8ef91cf 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -21,16 +21,19 @@ import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
 import org.apache.reef.tang.Injector;
 
 /**
  * Basic push policy.
+ * TODO #200: Maintain Test Passes and Policies Separately
  */
 public final class BasicPushPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder()
-          .registerCompileTimePass(new ShuffleEdgePushPass())
-          .registerCompileTimePass(new DefaultScheduleGroupPass());
+    new PolicyBuilder()
+      .registerCompileTimePass(new AggressiveSpeculativeCloningPass())
+      .registerCompileTimePass(new ShuffleEdgePushPass())
+      .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
 
   /**
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 1211370..48d598f 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -112,7 +112,17 @@ public final class WordCountITCase {
         .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName() + "_clonedscheduling")
         .addMaxTaskAttempt(Integer.MAX_VALUE)
-        .addOptimizationPolicy(ClonedSchedulingPolicyParallelismFive.class.getCanonicalName())
+        .addOptimizationPolicy(UpfrontSchedulingPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
+
+  @Test (timeout = TIMEOUT)
+  public void testSpeculativeExecution() throws Exception {
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WordCountITCase.class.getSimpleName() + "_speculative")
+      .addMaxTaskAttempt(Integer.MAX_VALUE)
+      .addOptimizationPolicy(AggressiveSpeculativeCloningPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
 }
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
similarity index 85%
copy from examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
copy to examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
index b0e6348..31bd14b 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
@@ -20,21 +20,22 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ClonedSchedulingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
 import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
 import org.apache.reef.tang.Injector;
+
 import java.util.List;
 
 /**
- * A default policy with cloning for tests.
+ * A default policy with (aggressive) speculative execution.
  */
-public final class ClonedSchedulingPolicyParallelismFive implements Policy {
+public final class AggressiveSpeculativeCloningPolicyParallelismFive implements Policy {
   private final Policy policy;
-  public ClonedSchedulingPolicyParallelismFive() {
+  public AggressiveSpeculativeCloningPolicyParallelismFive() {
     final List<CompileTimePass> overwritingPasses = DefaultPolicy.BUILDER.getCompileTimePasses();
-    overwritingPasses.add(new ClonedSchedulingPass()); // CLONING!
+    overwritingPasses.add(new AggressiveSpeculativeCloningPass()); // CLONING!
     this.policy = new PolicyImpl(
         PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
         DefaultPolicy.BUILDER.getRuntimePasses());
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
similarity index 87%
rename from examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
rename to examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
index b0e6348..a756037 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ClonedSchedulingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.UpfrontCloningPass;
 import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
@@ -28,13 +28,13 @@ import org.apache.reef.tang.Injector;
 import java.util.List;
 
 /**
- * A default policy with cloning for tests.
+ * A default policy with upfront cloning.
  */
-public final class ClonedSchedulingPolicyParallelismFive implements Policy {
+public final class UpfrontSchedulingPolicyParallelismFive implements Policy {
   private final Policy policy;
-  public ClonedSchedulingPolicyParallelismFive() {
+  public UpfrontSchedulingPolicyParallelismFive() {
     final List<CompileTimePass> overwritingPasses = DefaultPolicy.BUILDER.getCompileTimePasses();
-    overwritingPasses.add(new ClonedSchedulingPass()); // CLONING!
+    overwritingPasses.add(new UpfrontCloningPass()); // CLONING!
     this.policy = new PolicyImpl(
         PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
         DefaultPolicy.BUILDER.getRuntimePasses());
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index b2329ee..b21751c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -31,9 +31,9 @@ public final class BlockState {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
     stateMachineBuilder.addState(State.IN_PROGRESS, "The block is in the progress of being created.");
     stateMachineBuilder.addState(State.AVAILABLE, "The block is available.");
+    stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
 
     // From IN_PROGRESS
     stateMachineBuilder.addTransition(State.IN_PROGRESS, State.AVAILABLE, "The block is successfully created");
@@ -41,13 +41,7 @@ public final class BlockState {
         "The block is lost before being created");
 
     // From AVAILABLE
-    stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is lost");
-
-    // From NOT_AVAILABLE
-    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.IN_PROGRESS,
-        "The task that produces the block is scheduled.");
-    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.NOT_AVAILABLE,
-        "A block can be reported lost from multiple sources");
+    stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is not available");
 
     stateMachineBuilder.setInitialState(State.IN_PROGRESS);
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index e7edbad..1a63f0e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -44,6 +44,8 @@ public final class StageState {
     stateMachineBuilder.addTransition(State.INCOMPLETE, State.COMPLETE, "All tasks complete");
     stateMachineBuilder.addTransition(State.COMPLETE, State.INCOMPLETE,
         "Completed before, but a task in this stage should be retried");
+    stateMachineBuilder.addTransition(State.COMPLETE, State.COMPLETE,
+      "Completed before, but probably a cloned task has completed again");
 
     stateMachineBuilder.setInitialState(State.INCOMPLETE);
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index def0888..05b7467 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -85,6 +86,8 @@ public final class BlockManagerMaster {
 
   /**
    * Initializes the states of a block which will be produced by a producer task.
+   * This method is idempotent thanks to the 'Set' data structures.
+   * See BatchScheduler#doSchedule for details on scheduling same task attempts multiple times.
    *
    * @param blockId        the id of the block to initialize.
    * @param producerTaskId the id of the producer task.
@@ -134,31 +137,22 @@ public final class BlockManagerMaster {
   }
 
   /**
-   * Returns a handler of block location requests.
-   *
-   * @param blockIdOrWildcard id of the specified block.
-   * @return the handler of block location requests, which completes exceptionally when the block
-   * is not {@code IN_PROGRESS} or {@code AVAILABLE}.
+   * Get handlers of blocks that are in a particular state.
+   * @param blockIdOrWildcard to query
+   * @param state of the block
+   * @return the handlers, empty if none matches.
    */
-  public BlockRequestHandler getBlockLocationHandler(final String blockIdOrWildcard) {
+  public List<BlockRequestHandler> getBlockHandlers(final String blockIdOrWildcard,
+                                                    final BlockState.State state) {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
       final Set<BlockMetadata> metadataSet =
         getBlockWildcardStateSet(RuntimeIdManager.getWildCardFromBlockId(blockIdOrWildcard));
-      final List<BlockMetadata> candidates = metadataSet.stream()
-        .filter(metadata -> metadata.getBlockState().equals(BlockState.State.IN_PROGRESS)
-          || metadata.getBlockState().equals(BlockState.State.AVAILABLE))
+      return metadataSet.stream()
+        .filter(metadata -> metadata.getBlockState().equals(state))
+        .map(BlockMetadata::getLocationHandler)
         .collect(Collectors.toList());
-      if (!candidates.isEmpty()) {
-        // Randomly pick one of the candidate handlers.
-        return candidates.get(random.nextInt(candidates.size())).getLocationHandler();
-      } else {
-        // No candidate exists
-        final BlockRequestHandler handler = new BlockRequestHandler(blockIdOrWildcard);
-        handler.completeExceptionally(new AbsentBlockException(blockIdOrWildcard, BlockState.State.NOT_AVAILABLE));
-        return handler;
-      }
     } finally {
       readLock.unlock();
     }
@@ -170,7 +164,7 @@ public final class BlockManagerMaster {
    * @param blockId the id of the block.
    * @return the ids of the producer tasks.
    */
-  private Set<String> getProducerTaskIds(final String blockId) {
+  public Set<String> getProducerTaskIds(final String blockId) {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
@@ -241,7 +235,9 @@ public final class BlockManagerMaster {
             if (location.get().equals(executorId)) {
               blockIds.add(blockMetadata.getBlockId());
             }
-          } catch (final InterruptedException | ExecutionException e) {
+          } catch (final CancellationException | ExecutionException e) {
+            // Don't add (NOT_AVAILABLE)
+          } catch (final InterruptedException e) {
             // Cannot reach here because we check the completion of the future already.
             LOG.error("Exception while getting the location of a block!", e);
             Thread.currentThread().interrupt();
@@ -302,21 +298,37 @@ public final class BlockManagerMaster {
   }
 
   /**
-   * Deals with a request for the location of a block.
-   *
    * @param message        the request message.
    * @param messageContext the message context which will be used for response.
    */
-  void onRequestBlockLocation(final ControlMessage.Message message,
-                              final MessageContext messageContext) {
+  private void registerLocationRequest(final ControlMessage.Message message, final MessageContext messageContext) {
     assert (message.getType() == ControlMessage.MessageType.RequestBlockLocation);
     final String blockIdWildcard = message.getRequestBlockLocationMsg().getBlockIdWildcard();
     final long requestId = message.getId();
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
-      final BlockRequestHandler locationFuture = getBlockLocationHandler(blockIdWildcard);
-      locationFuture.registerRequest(requestId, messageContext);
+      // (CASE 1) Check AVAILABLE blocks.
+      final List<BlockRequestHandler> availableBlocks = getBlockHandlers(blockIdWildcard, BlockState.State.AVAILABLE);
+      if (!availableBlocks.isEmpty()) {
+        // random pick
+        // TODO #201: Let Executors Try Multiple Input Block Clones
+        availableBlocks.get(random.nextInt(availableBlocks.size())).registerRequest(requestId, messageContext);
+        return;
+      }
+
+      // (CASE 2) Check IN_PROGRESS blocks.
+      final List<BlockRequestHandler> progressBlocks = getBlockHandlers(blockIdWildcard, BlockState.State.IN_PROGRESS);
+      if (!progressBlocks.isEmpty()) {
+        // random pick
+        progressBlocks.get(random.nextInt(progressBlocks.size())).registerRequest(requestId, messageContext);
+        return;
+      }
+
+      // (CASE 3) Unfortunately, there is no good block to use.
+      final BlockRequestHandler absent = new BlockRequestHandler(blockIdWildcard);
+      absent.completeExceptionally(new AbsentBlockException(blockIdWildcard, BlockState.State.NOT_AVAILABLE));
+      absent.registerRequest(requestId, messageContext);
     } finally {
       readLock.unlock();
     }
@@ -352,7 +364,7 @@ public final class BlockManagerMaster {
     public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
       switch (message.getType()) {
         case RequestBlockLocation:
-          onRequestBlockLocation(message, messageContext);
+          registerLocationRequest(message, messageContext);
           break;
         default:
           throw new IllegalMessageException(
@@ -366,16 +378,16 @@ public final class BlockManagerMaster {
    * The handler of block location requests.
    */
   public static final class BlockRequestHandler {
-    private final String blockId;
+    private final String blockIdOrWildcard;
     private final CompletableFuture<String> locationFuture;
 
     /**
      * Constructor.
      *
-     * @param blockId the ID of the block.
+     * @param blockIdOrWildcard the ID of the block.
      */
-    BlockRequestHandler(final String blockId) {
-      this.blockId = blockId;
+    BlockRequestHandler(final String blockIdOrWildcard) {
+      this.blockIdOrWildcard = blockIdOrWildcard;
       this.locationFuture = new CompletableFuture<>();
     }
 
@@ -411,7 +423,7 @@ public final class BlockManagerMaster {
       final ControlMessage.BlockLocationInfoMsg.Builder infoMsgBuilder =
         ControlMessage.BlockLocationInfoMsg.newBuilder()
           .setRequestId(requestId)
-          .setBlockId(blockId);
+          .setBlockId(blockIdOrWildcard);
 
       locationFuture.whenComplete((location, throwable) -> {
         if (throwable == null) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index e1f2af7..a5eb8c3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -64,9 +64,7 @@ final class BlockMetadata {
       case IN_PROGRESS:
         break;
       case NOT_AVAILABLE:
-        // Reset the block location and committer information.
         locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
-        locationHandler = new BlockManagerMaster.BlockRequestHandler(blockId);
         break;
       case AVAILABLE:
         if (location == null) {
@@ -115,4 +113,17 @@ final class BlockMetadata {
     sb.append(")");
     return sb.toString();
   }
+
+  @Override
+  public boolean equals(final Object that) {
+    if (!(that instanceof BlockMetadata)) {
+      return false;
+    }
+    return this.blockId.equals(((BlockMetadata) that).getBlockId());
+  }
+
+  @Override
+  public int hashCode() {
+    return blockId.hashCode();
+  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index f798ac1..5017311 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -75,6 +75,13 @@ public final class PlanStateManager {
   private final Map<String, List<List<TaskState>>> stageIdToTaskAttemptStates; // sorted by task idx, and then attempt
 
   /**
+   * Used for speculative cloning. (in the unit of milliseconds - ms)
+   */
+  private final Map<String, Long> taskIdToStartTimeMs = new HashMap<>();
+  private final Map<String, List<Long>> stageIdToCompletedTaskTimeMsList = new HashMap<>();
+  private final Map<String, Map<Integer, Integer>> stageIdToTaskIndexToNumOfClones = new HashMap<>();
+
+  /**
    * Represents the plan to manage.
    */
   private PhysicalPlan physicalPlan;
@@ -122,6 +129,8 @@ public final class PlanStateManager {
     if (!initialized) {
       // First scheduling.
       this.initialized = true;
+    } else {
+      LOG.info("Update Plan from {} to {}", physicalPlan.getPlanId(), physicalPlanToUpdate.getPlanId());
     }
     this.planState = new PlanState();
     this.metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlanToUpdate.getStageDAG());
@@ -129,22 +138,22 @@ public final class PlanStateManager {
     this.physicalPlan = physicalPlanToUpdate;
     this.planId = physicalPlanToUpdate.getPlanId();
     this.maxScheduleAttempt = maxScheduleAttemptToSet;
-    initializeComputationStates();
+    initializeStates();
   }
 
   /**
    * Initializes the states for the plan/stages/tasks for this plan.
    * TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
    */
-  private void initializeComputationStates() {
+  private void initializeStates() {
     onPlanStateChanged(PlanState.State.EXECUTING);
     physicalPlan.getStageDAG().topologicalDo(stage -> {
       if (!stageIdToState.containsKey(stage.getId())) {
         stageIdToState.put(stage.getId(), new StageState());
         stageIdToTaskAttemptStates.put(stage.getId(), new ArrayList<>(stage.getParallelism()));
+
         // for each task idx of this stage
         for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
-          // for each task idx of this stage
           stageIdToTaskAttemptStates.get(stage.getId()).add(new ArrayList<>());
           // task states will be initialized lazily in getTaskAttemptsToSchedule()
         }
@@ -152,6 +161,9 @@ public final class PlanStateManager {
     });
   }
 
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// Core scheduling methods
+
   /**
    * Get task attempts that are "READY".
    *
@@ -169,7 +181,7 @@ public final class PlanStateManager {
     final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
     for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
       final List<TaskState> attemptStatesForThisTaskIndex =
-        stageIdToTaskAttemptStates.get(stage.getId()).get(taskIndex);
+        stageIdToTaskAttemptStates.get(stageId).get(taskIndex);
 
       // If one of the attempts is COMPLETE, do not schedule
       if (attemptStatesForThisTaskIndex
@@ -177,10 +189,17 @@ public final class PlanStateManager {
         .noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
 
         // (Step 1) Create new READY attempts, as many as
-        // # of clones - # of 'not-done' attempts)
-        final int numOfClones = stage.getPropertyValue(ClonedSchedulingProperty.class).orElse(1);
+        // # of numOfConcurrentAttempts(including clones) - # of 'not-done' attempts
+        stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
+        final Optional<ClonedSchedulingProperty.CloneConf> cloneConf =
+          stage.getPropertyValue(ClonedSchedulingProperty.class);
+        final int numOfConcurrentAttempts = cloneConf.isPresent() && cloneConf.get().isUpFrontCloning()
+          // For now we support up to 1 clone (2 concurrent = 1 original + 1 clone)
+          ? 2
+          // If the property is not set, then we do not clone (= 1 concurrent)
+          : stageIdToTaskIndexToNumOfClones.get(stageId).getOrDefault(stageId, 1);
         final long numOfNotDoneAttempts = attemptStatesForThisTaskIndex.stream().filter(this::isTaskNotDone).count();
-        for (int i = 0; i < numOfClones - numOfNotDoneAttempts; i++) {
+        for (int i = 0; i < numOfConcurrentAttempts - numOfNotDoneAttempts; i++) {
           attemptStatesForThisTaskIndex.add(new TaskState());
         }
 
@@ -204,23 +223,66 @@ public final class PlanStateManager {
     return taskAttemptsToSchedule;
   }
 
-  private boolean isTaskNotDone(final TaskState taskState) {
-    final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
-    return state.equals(TaskState.State.READY)
-      || state.equals(TaskState.State.EXECUTING)
-      || state.equals(TaskState.State.ON_HOLD);
-  }
-
   /**
-   * Gets the attempt numbers of all tasks in a stage.
-   *
-   * @param stageId the stage to investigate.
-   * @return the attempt numbers of all tasks in a stage.
+   * @param stageId to query.
+   * @return all task attempt ids of the stage.
    */
   public synchronized Set<String> getAllTaskAttemptsOfStage(final String stageId) {
     return getTaskAttemptIdsToItsState(stageId).keySet();
   }
 
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// Speculative execution
+
+  /**
+   * @param stageId to query.
+   * @return a map from an EXECUTING task to its running time so far.
+   */
+  public synchronized Map<String, Long> getExecutingTaskToRunningTimeMs(final String stageId) {
+    final long curTime = System.currentTimeMillis();
+    final Map<String, Long> result = new HashMap<>();
+
+    final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
+    for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
+      final List<TaskState> attemptStates = taskStates.get(taskIndex);
+      for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
+        if (TaskState.State.EXECUTING.equals(attemptStates.get(attempt).getStateMachine().getCurrentState())) {
+          final String taskId = RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt);
+          result.put(taskId, curTime - taskIdToStartTimeMs.get(taskId));
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * List of task times so far for this stage.
+   * @param stageId of the stage.
+   * @return a copy of the list, empty if none completed.
+   */
+  public synchronized List<Long> getCompletedTaskTimeListMs(final String stageId) {
+    // Return a copy
+    return new ArrayList<>(stageIdToCompletedTaskTimeMsList.getOrDefault(stageId, new ArrayList<>(0)));
+  }
+
+  /**
+   * @param stageId of the clone.
+   * @param taskIndex of the clone.
+   * @param numOfClones of the clone.
+   * @return true if the numOfClones has been modified, false otherwise
+   */
+  public synchronized boolean setNumOfClones(final String stageId, final int taskIndex, final int numOfClones) {
+    stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
+    // overwrite the previous value.
+    final Integer previousNumOfClones = stageIdToTaskIndexToNumOfClones.get(stageId).put(taskIndex, numOfClones);
+    return (previousNumOfClones == null) || (previousNumOfClones != numOfClones);
+  }
+
+
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// State transitions
+
   /**
    * Updates the state of a task.
    * Task state changes can occur both in master and executor.
@@ -252,18 +314,28 @@ public final class PlanStateManager {
     final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
     final List<List<TaskState>> taskStatesOfThisStage = stageIdToTaskAttemptStates.get(stageId);
     final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.stream()
-      .map(attempts -> attempts.stream()
-        .map(state -> state.getStateMachine().getCurrentState())
-        .allMatch(curState -> curState.equals(TaskState.State.COMPLETE)
-          || curState.equals(TaskState.State.SHOULD_RETRY)
-          || curState.equals(TaskState.State.ON_HOLD)))
-      .filter(bool -> bool.equals(true))
+      .filter(attempts -> {
+        final List<TaskState.State> states = attempts
+          .stream()
+          .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+          .collect(Collectors.toList());
+        return states.stream().anyMatch(curState -> curState.equals(TaskState.State.ON_HOLD)) // one of them is ON_HOLD
+          || states.stream().anyMatch(curState -> curState.equals(TaskState.State.COMPLETE)); // one of them is COMPLETE
+      })
       .count();
     if (newTaskState.equals(TaskState.State.COMPLETE)) {
       LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
         taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
     }
 
+    // Maintain info for speculative execution
+    if (newTaskState.equals(TaskState.State.EXECUTING)) {
+      taskIdToStartTimeMs.put(taskId, System.currentTimeMillis());
+    } else if (newTaskState.equals(TaskState.State.COMPLETE)) {
+      stageIdToCompletedTaskTimeMsList.putIfAbsent(stageId, new ArrayList<>());
+      stageIdToCompletedTaskTimeMsList.get(stageId).add(System.currentTimeMillis() - taskIdToStartTimeMs.get(taskId));
+    }
+
     // Change stage state, if needed
     switch (newTaskState) {
       // INCOMPLETE stage
@@ -295,20 +367,6 @@ public final class PlanStateManager {
     }
   }
 
-  private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) {
-    final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
-    final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
-    final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
-
-    final List<TaskState> otherAttemptsforTheSameTaskIndex =
-      new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
-    otherAttemptsforTheSameTaskIndex.remove(attempt);
-
-    return otherAttemptsforTheSameTaskIndex.stream()
-      .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
-      .collect(Collectors.toList());
-  }
-
   /**
    * (PRIVATE METHOD)
    * Updates the state of a stage.
@@ -376,6 +434,8 @@ public final class PlanStateManager {
     }
   }
 
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// Helper Methods
 
   /**
    * Wait for this plan to be finished and return the final state.
@@ -423,6 +483,18 @@ public final class PlanStateManager {
   }
 
   /**
+   * @return a map from task attempt id to its current state.
+   */
+  @VisibleForTesting
+  public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
+    return physicalPlan.getStageDAG().getVertices()
+      .stream()
+      .map(Stage::getId)
+      .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
+      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  /**
    * @return whether the execution for the plan is done or not.
    */
   public synchronized boolean isPlanDone() {
@@ -459,6 +531,19 @@ public final class PlanStateManager {
     return (TaskState.State) getTaskStateHelper(taskId).getStateMachine().getCurrentState();
   }
 
+  private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
+    final Map<String, TaskState.State> result = new HashMap<>();
+    final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
+    for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
+      final List<TaskState> attemptStates = taskStates.get(taskIndex);
+      for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
+        result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
+          (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
+      }
+    }
+    return result;
+  }
+
   private TaskState getTaskStateHelper(final String taskId) {
     return stageIdToTaskAttemptStates
       .get(RuntimeIdManager.getStageIdFromTaskId(taskId))
@@ -466,6 +551,27 @@ public final class PlanStateManager {
       .get(RuntimeIdManager.getAttemptFromTaskId(taskId));
   }
 
+  private boolean isTaskNotDone(final TaskState taskState) {
+    final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
+    return state.equals(TaskState.State.READY)
+      || state.equals(TaskState.State.EXECUTING)
+      || state.equals(TaskState.State.ON_HOLD);
+  }
+
+  private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) {
+    final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
+    final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
+    final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
+
+    final List<TaskState> otherAttemptsforTheSameTaskIndex =
+      new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
+    otherAttemptsforTheSameTaskIndex.remove(attempt);
+
+    return otherAttemptsforTheSameTaskIndex.stream()
+      .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+      .collect(Collectors.toList());
+  }
+
   /**
    * @return the physical plan.
    */
@@ -548,26 +654,4 @@ public final class PlanStateManager {
     sb.append("]}");
     return sb.toString();
   }
-
-  @VisibleForTesting
-  public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
-    return physicalPlan.getStageDAG().getVertices()
-      .stream()
-      .map(Stage::getId)
-      .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
-      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-  }
-
-  private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
-    final Map<String, TaskState.State> result = new HashMap<>();
-    final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
-    for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
-      final List<TaskState> attemptStates = taskStates.get(taskIndex);
-      for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
-        result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
-          (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
-      }
-    }
-    return result;
-  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 4285469..00eac87 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -72,8 +72,11 @@ public final class RuntimeMaster {
   private static final int DAG_LOGGING_PERIOD = 3000;
   private static final int METRIC_ARRIVE_TIMEOUT = 10000;
   private static final int REST_SERVER_PORT = 10101;
+  private static final int SPECULATION_CHECKING_PERIOD_MS = 100;
 
   private final ExecutorService runtimeMasterThread;
+  private final ScheduledExecutorService speculativeTaskCloningThread;
+
   private final Scheduler scheduler;
   private final ContainerManager containerManager;
   private final MetricMessageHandler metricMessageHandler;
@@ -103,6 +106,16 @@ public final class RuntimeMaster {
     // and keeping it single threaded removes the complexity of multi-thread synchronization.
     this.runtimeMasterThread =
         Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
+
+    // Check for speculative execution every second.
+    this.speculativeTaskCloningThread = Executors
+      .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "SpeculativeTaskCloning thread"));
+    this.speculativeTaskCloningThread.scheduleAtFixedRate(
+      () -> this.runtimeMasterThread.submit(scheduler::onSpeculativeExecutionCheck),
+      SPECULATION_CHECKING_PERIOD_MS,
+      SPECULATION_CHECKING_PERIOD_MS,
+      TimeUnit.MILLISECONDS);
+
     this.scheduler = scheduler;
     this.containerManager = containerManager;
     this.metricMessageHandler = metricMessageHandler;
@@ -169,6 +182,9 @@ public final class RuntimeMaster {
    * Terminates the RuntimeMaster.
    */
   public void terminate() {
+    // No need to speculate anymore
+    speculativeTaskCloningThread.shutdown();
+
     // send metric flush request to all executors
     metricManagerMaster.sendMetricFlushRequest();
     try {
@@ -181,6 +197,7 @@ public final class RuntimeMaster {
       // clean up state...
       Thread.currentThread().interrupt();
     }
+
     runtimeMasterThread.execute(() -> {
       scheduler.terminate();
       try {
@@ -406,7 +423,6 @@ public final class RuntimeMaster {
   private ScheduledExecutorService scheduleDagLogging() {
     final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor();
     dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
-
       public void run() {
         planStateManager.storeJSON("periodic");
       }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index ac2db86..5ccc2fc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -17,13 +17,16 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.collect.Sets;
 import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.PlanAppender;
 import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
@@ -34,6 +37,7 @@ import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +45,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -92,7 +96,7 @@ public final class BatchScheduler implements Scheduler {
     updatePhysicalPlanEventHandler.setScheduler(this);
     if (pubSubEventHandlerWrapper.getPubSubEventHandler() != null) {
       pubSubEventHandlerWrapper.getPubSubEventHandler()
-          .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
+        .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
     }
     this.executorRegistry = executorRegistry;
     this.planStateManager = planStateManager;
@@ -121,7 +125,7 @@ public final class BatchScheduler implements Scheduler {
     } else {
       // Append the submitted plan to the original plan.
       final PhysicalPlan appendedPlan =
-          PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
+        PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
       updatePlan(appendedPlan, maxScheduleAttempt);
       planStateManager.storeJSON("appended");
     }
@@ -147,11 +151,11 @@ public final class BatchScheduler implements Scheduler {
                           final int maxScheduleAttempt) {
     planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
     this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
-        .collect(Collectors.groupingBy(Stage::getScheduleGroup))
-        .entrySet().stream()
-        .sorted(Map.Entry.comparingByKey())
-        .map(Map.Entry::getValue)
-        .collect(Collectors.toList());
+      .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+      .entrySet().stream()
+      .sorted(Map.Entry.comparingByKey())
+      .map(Map.Entry::getValue)
+      .collect(Collectors.toList());
   }
 
   /**
@@ -186,7 +190,7 @@ public final class BatchScheduler implements Scheduler {
         break;
       case FAILED:
         throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
-            .append(taskId).append(" in Executor ").append(executorId).toString()));
+          .append(taskId).append(" in Executor ").append(executorId).toString()));
       case READY:
       case EXECUTING:
         throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
@@ -228,6 +232,56 @@ public final class BatchScheduler implements Scheduler {
   }
 
   @Override
+  public void onSpeculativeExecutionCheck() {
+    MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
+
+    selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
+      scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
+        final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
+
+        // Only if the ClonedSchedulingProperty is set...
+        stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
+          if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
+            final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
+            final int parallelism = stage.getParallelism();
+            final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();
+
+            // Only after the fraction of the tasks are done...
+            // Delayed cloning (aggressive)
+            if (completedTaskTimes.length > 0
+              && completedTaskTimes.length >= Math.round(parallelism * fractionToWaitFor)) {
+              Arrays.sort(completedTaskTimes);
+              final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
+              final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
+              final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
+              for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {
+
+                // Only if the running task is considered a 'straggler'....
+                final long runningTime = entry.getValue();
+                if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
+                  final String taskId = entry.getKey();
+                  final boolean isCloned = planStateManager.setNumOfClones(
+                    stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
+                  if (isCloned) {
+                    LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+                        + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
+                      medianTime, medianTimeMultiplier);
+                  }
+                  isNumOfCloneChanged.setValue(isCloned);
+                }
+              }
+            }
+          }
+        });
+      });
+    });
+
+    if (isNumOfCloneChanged.booleanValue()) {
+      doSchedule(); // Do schedule the new clone.
+    }
+  }
+
+  @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
     LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
     executorRegistry.registerExecutor(executorRepresenter);
@@ -246,6 +300,9 @@ public final class BatchScheduler implements Scheduler {
       return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
     });
 
+    // Blocks of the interrupted tasks are failed.
+    interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
+
     // Retry the interrupted tasks (and required parents)
     retryTasksAndRequiredParents(interruptedTasks);
 
@@ -273,25 +330,21 @@ public final class BatchScheduler implements Scheduler {
     final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
 
     if (earliest.isPresent()) {
-      // Get schedulable tasks.
       final List<Task> tasksToSchedule = earliest.get().stream()
-          .flatMap(stage -> selectSchedulableTasks(stage).stream())
-          .collect(Collectors.toList());
-
-      // We prefer (but not guarantee) to schedule the 'receiving' tasks first,
-      // assuming that tasks within a ScheduleGroup are connected with 'push' edges.
-      Collections.reverse(tasksToSchedule);
-
-      LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+        .flatMap(stage -> selectSchedulableTasks(stage).stream())
+        .collect(Collectors.toList());
+      if (!tasksToSchedule.isEmpty()) {
+        LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
           .map(Task::getTaskId)
           .map(RuntimeIdManager::getStageIdFromTaskId)
           .collect(Collectors.toSet()));
 
-      // Set the pointer to the schedulable tasks.
-      pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+        // Set the pointer to the schedulable tasks.
+        pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
 
-      // Notify the dispatcher that a new collection is available.
-      taskDispatcher.onNewPendingTaskCollectionAvailable();
+        // Notify the dispatcher that a new collection is available.
+        taskDispatcher.onNewPendingTaskCollectionAvailable();
+      }
     } else {
       LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
     }
@@ -303,11 +356,11 @@ public final class BatchScheduler implements Scheduler {
     }
 
     return sortedScheduleGroups.stream()
-        .filter(scheduleGroup -> scheduleGroup.stream()
-            .map(Stage::getId)
-            .map(planStateManager::getStageState)
-            .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
-        .findFirst(); // selects the one with the smallest scheduling group index.
+      .filter(scheduleGroup -> scheduleGroup.stream()
+        .map(Stage::getId)
+        .map(planStateManager::getStageState)
+        .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
+      .findFirst(); // selects the one with the smallest scheduling group index.
   }
 
   private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
@@ -322,9 +375,9 @@ public final class BatchScheduler implements Scheduler {
     }
 
     final List<StageEdge> stageIncomingEdges =
-        planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+      planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<StageEdge> stageOutgoingEdges =
-        planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+      planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
     // Create and return tasks.
     final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
@@ -332,21 +385,17 @@ public final class BatchScheduler implements Scheduler {
     final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
     final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
-      final Set<String> blockIds = planStateManager.getPhysicalPlan().getStageDAG()
-          .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
-          .stream()
-          .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
-          .collect(Collectors.toSet()); // ids of blocks this task will produce
+      final Set<String> blockIds = getOutputBlockIds(taskId);
       blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
       final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
       tasks.add(new Task(
-          planStateManager.getPhysicalPlan().getPlanId(),
-          taskId,
-          stageToSchedule.getExecutionProperties(),
-          stageToSchedule.getSerializedIRDAG(),
-          stageIncomingEdges,
-          stageOutgoingEdges,
-          vertexIdToReadables.get(taskIdx)));
+        planStateManager.getPhysicalPlan().getPlanId(),
+        taskId,
+        stageToSchedule.getExecutionProperties(),
+        stageToSchedule.getSerializedIRDAG(),
+        stageIncomingEdges,
+        stageOutgoingEdges,
+        vertexIdToReadables.get(taskIdx)));
     });
     return tasks;
   }
@@ -407,7 +456,7 @@ public final class BatchScheduler implements Scheduler {
     final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
-        planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+      planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
     final StageEdge targetEdge = getEdgeToOptimize(taskId);
     if (targetEdge == null) {
@@ -416,11 +465,11 @@ public final class BatchScheduler implements Scheduler {
 
     if (stageComplete) {
       final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
-          .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
-          .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+        .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+        .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
       pubSubEventHandlerWrapper.getPubSubEventHandler()
-          .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
-              taskId, executorId, targetEdge));
+        .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
+          taskId, executorId, targetEdge));
     }
   }
 
@@ -461,50 +510,77 @@ public final class BatchScheduler implements Scheduler {
     final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
     LOG.info("Will be retried: {}", tasksToRetry);
     tasksToRetry.forEach(
-        taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+      taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
   }
 
   private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
     if (children.isEmpty()) {
       return Collections.emptySet();
     }
+    final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
+
+    final Map<String, StageEdge> idToIncomingEdges = children.stream()
+      .map(RuntimeIdManager::getStageIdFromTaskId)
+      .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
+      // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
+      .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
 
     final Set<String> parentsWithLostBlocks = children.stream()
-        .flatMap(child -> getParentTasks(child).stream())
-        .filter(parent -> {
-          final CompletableFuture<String> locationFuture =
-            blockManagerMaster.getBlockLocationHandler(parent).getLocationFuture();
-          return locationFuture.isCompletedExceptionally() || locationFuture.isCancelled();
-        })
-        .collect(Collectors.toSet());
+      .flatMap(child -> getInputBlockIds(child).stream()) // child task id -> parent block ids
+      .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
+      .collect(Collectors.toSet()).stream() // remove duplicate wildcards
+      .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
+        blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
+      .flatMap(lostParentBlockWildcard -> {
+        // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
+        final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
+        final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
+        final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
+        return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
+          .stream()
+          .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
+            && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
+          // COMPLETE -> SHOULD_RETRY
+          .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
+      })
+      .collect(Collectors.toSet());
+
 
     // Recursive call
     return Sets.union(parentsWithLostBlocks, recursivelyGetParentTasksForLostBlocks(parentsWithLostBlocks));
   }
 
-  private Set<String> getParentTasks(final String childTaskId) {
+  private Set<String> getOutputBlockIds(final String taskId) {
+    return planStateManager.getPhysicalPlan().getStageDAG()
+      .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+      .stream()
+      .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+      .collect(Collectors.toSet()); // ids of blocks this task will produce
+  }
 
+  private Set<String> getInputBlockIds(final String childTaskId) {
     final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
     return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
-        .stream()
-        .flatMap(inStageEdge -> {
-          final String parentStageId = inStageEdge.getSrc().getId();
-          final Set<String> tasksOfParentStage = planStateManager.getAllTaskAttemptsOfStage(parentStageId);
-
-          switch (inStageEdge.getDataCommunicationPattern()) {
-            case Shuffle:
-            case BroadCast:
-              // All of the parent stage's tasks
-              return tasksOfParentStage.stream();
-            case OneToOne:
-              // Same-index tasks of the parent stage
-              return tasksOfParentStage.stream().filter(task ->
-                  RuntimeIdManager.getIndexFromTaskId(task) == RuntimeIdManager.getIndexFromTaskId(childTaskId));
-            default:
-              throw new IllegalStateException(inStageEdge.toString());
-          }
-        })
-        .collect(Collectors.toSet());
+      .stream()
+      .flatMap(inStageEdge -> {
+        final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
+        switch (inStageEdge.getDataCommunicationPattern()) {
+          case Shuffle:
+          case BroadCast:
+            // All of the parent stage's tasks
+            return parentTaskIds.stream()
+              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+          case OneToOne:
+            // Same-index tasks of the parent stage
+            return parentTaskIds.stream()
+              .filter(parentTaskId ->
+                RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
+              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+          default:
+            throw new IllegalStateException(inStageEdge.toString());
+        }
+      })
+      .collect(Collectors.toSet());
   }
 
   /**
@@ -514,8 +590,8 @@ public final class BatchScheduler implements Scheduler {
    */
   public void updateDynOptData(final Object dynOptData) {
     final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
-        .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
-        .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+
+      .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
     dynOptDataHandler.updateDynOptData(dynOptData);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
similarity index 63%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
index 4e37fde..769719c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
@@ -24,6 +24,7 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -32,6 +33,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 /**
  * This policy tries to pick the executors where the corresponding source or intermediate data for a task reside.
@@ -39,50 +41,48 @@ import java.util.concurrent.ExecutionException;
 @ThreadSafe
 @DriverSide
 @AssociatedProperty(ResourceLocalityProperty.class)
-public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+public final class LocalitySchedulingConstraint implements SchedulingConstraint {
   private final BlockManagerMaster blockManagerMaster;
 
   @Inject
-  private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
+  private LocalitySchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
     this.blockManagerMaster = blockManagerMaster;
   }
 
   /**
-   * Find the location of the intermediate data for a task.
+   * Find the locations of the intermediate data for a task.
    * It is only possible if the task receives only one input edge with One-to-One communication pattern, and
    * the location of the input data is known.
    *
    * @param task the task to schedule.
-   * @return the intermediate data location.
+   * @return the intermediate data locations, empty if none exists.
    */
-  private Optional<String> getIntermediateDataLocation(final Task task) {
+  private List<String> getIntermediateDataLocations(final Task task) {
     if (task.getTaskIncomingEdges().size() == 1) {
       final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
       if (CommunicationPatternProperty.Value.OneToOne.equals(
-          physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
-              .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+        physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+          .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
         final Optional<DuplicateEdgeGroupPropertyValue> dupProp =
-            physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+          physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
         final String representativeEdgeId = dupProp.isPresent()
-            ? dupProp.get().getRepresentativeEdgeId() : physicalStageEdge.getId();
-        final String blockIdToRead =
-            RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
-        final BlockManagerMaster.BlockRequestHandler locationHandler =
-            blockManagerMaster.getBlockLocationHandler(blockIdToRead);
-        if (locationHandler.getLocationFuture().isDone()) { // if the location is known.
-          try {
-            final String location = locationHandler.getLocationFuture().get();
-            return Optional.of(location);
-          } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-          } catch (final ExecutionException e) {
-            throw new RuntimeException(e);
-          }
-        }
+          ? dupProp.get().getRepresentativeEdgeId()
+          : physicalStageEdge.getId();
+
+        final String blockIdToRead = RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
+        return blockManagerMaster.getBlockHandlers(blockIdToRead, BlockState.State.AVAILABLE)
+          .stream()
+          .map(handler -> {
+            try {
+              return handler.getLocationFuture().get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException(e);
+            }
+          })
+          .collect(Collectors.toList());
       }
     }
-    return Optional.empty();
+    return Collections.emptyList();
   }
 
   /**
@@ -90,7 +90,7 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
    * @return Set of source locations from source tasks in {@code taskDAG}
    * @throws Exception for any exception raised during querying source locations for a readable
    */
-  private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
+  private static Set<String> getSourceDataLocations(final Collection<Readable> readables) throws Exception {
     final List<String> sourceLocations = new ArrayList<>();
     for (final Readable readable : readables) {
       sourceLocations.addAll(readable.getLocations());
@@ -100,10 +100,11 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
 
   @Override
   public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
-    if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+    if (task.getTaskIncomingEdges().isEmpty()) {
+      // Source task
       final Set<String> sourceLocations;
       try {
-        sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
+        sourceLocations = getSourceDataLocations(task.getIrVertexIdToReadable().values());
       } catch (final UnsupportedOperationException e) {
         return true;
       } catch (final Exception e) {
@@ -115,13 +116,15 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
       }
 
       return sourceLocations.contains(executor.getNodeName());
-    } else { // Non-source task.
-      final Optional<String> optionalIntermediateLoc = getIntermediateDataLocation(task);
-
-      if (getIntermediateDataLocation(task).isPresent()) {
-        return optionalIntermediateLoc.get().equals(executor.getExecutorId());
-      } else {
+    } else {
+      // Non-source task.
+      final List<String> intermediateLocations = getIntermediateDataLocations(task);
+      if (intermediateLocations.isEmpty()) {
+        // Since there is no known location, we just schedule the task to any executor.
         return true;
+      } else {
+        // There is a known location(s), so we schedule to it(them).
+        return intermediateLocations.contains(executor.getExecutorId());
       }
     }
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 12abd56..d7f5456 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -81,6 +81,11 @@ public interface Scheduler {
                                      TaskState.RecoverableTaskFailureCause failureCause);
 
   /**
+   * Called to check for speculative execution.
+   */
+  void onSpeculativeExecutionCheck();
+
+  /**
    * To be called when a job should be terminated.
    * Any clean up code should be implemented in this method.
    */
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 50dd8d5..6638a36 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -39,12 +39,12 @@ public final class SchedulingConstraintRegistry {
   private SchedulingConstraintRegistry(
       final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
       final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
-      final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint,
+      final LocalitySchedulingConstraint localitySchedulingConstraint,
       final SkewnessAwareSchedulingConstraint skewnessAwareSchedulingConstraint,
       final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
     registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
     registerSchedulingConstraint(freeSlotSchedulingConstraint);
-    registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+    registerSchedulingConstraint(localitySchedulingConstraint);
     registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
     registerSchedulingConstraint(nodeShareSchedulingConstraint);
   }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 959b3fc..0b0a832 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import static org.junit.Assert.assertEquals;
@@ -37,8 +38,8 @@ import static org.junit.Assert.assertTrue;
  * Test for {@link BlockManagerMaster}.
  */
 public final class BlockManagerMasterTest {
-  private static int FIRST_ATTEMPT = 0;
-  private static int SECOND_ATTEPMT = 1;
+  private final static int FIRST_ATTEMPT = 0;
+  private final static int SECOND_ATTEMPT = 1;
   private BlockManagerMaster blockManagerMaster;
 
   @Before
@@ -48,9 +49,9 @@ public final class BlockManagerMasterTest {
     blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
   }
 
-  private static void checkBlockAbsentException(final Future<String> future,
-                                                final String expectedPartitionId,
-                                                final BlockState.State expectedState)
+  private static void checkInProgressToNotAvailableException(final Future<String> future,
+                                                             final String expectedPartitionId,
+                                                             final BlockState.State expectedState)
       throws IllegalStateException, InterruptedException {
     assertTrue(future.isDone());
     try {
@@ -88,24 +89,22 @@ public final class BlockManagerMasterTest {
     final String executorId = RuntimeIdManager.generateExecutorId();
     final String blockId = RuntimeIdManager.generateBlockId(edgeId, taskId);
 
-    // Initially the block state is NOT_AVAILABLE.
-    checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.NOT_AVAILABLE);
+    // Initially the block state does not exist.
+    assertTrue(blockManagerMaster.getBlockHandlers(blockId, BlockState.State.IN_PROGRESS).isEmpty());
 
     // The block is being IN_PROGRESS.
     blockManagerMaster.onProducerTaskScheduled(taskId, Collections.singleton(blockId));
-    final Future<String> future = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
+    final Future<String> future = getSingleLocationFuture(blockId, BlockState.State.IN_PROGRESS);
     checkPendingFuture(future);
 
     // The block is AVAILABLE
     blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
     checkBlockLocation(future, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
-    checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
+    checkBlockLocation(getSingleLocationFuture(blockId, BlockState.State.AVAILABLE), executorId);
 
     // We lost the block.
     blockManagerMaster.removeWorker(executorId);
-    checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.NOT_AVAILABLE);
+    getSingleLocationFuture(blockId, BlockState.State.NOT_AVAILABLE); // this call should succeed with no error.
   }
 
   /**
@@ -125,38 +124,42 @@ public final class BlockManagerMasterTest {
 
       // The block is being scheduled.
       blockManagerMaster.onProducerTaskScheduled(firstAttemptTaskId, Collections.singleton(firstAttemptBlockId));
-      final Future<String> future0 = blockManagerMaster.getBlockLocationHandler(firstAttemptBlockId).getLocationFuture();
+      final Future<String> future0 = getSingleLocationFuture(firstAttemptBlockId, BlockState.State.IN_PROGRESS);
       checkPendingFuture(future0);
 
       // Producer task fails.
       blockManagerMaster.onProducerTaskFailed(firstAttemptTaskId);
 
       // A future, previously pending on IN_PROGRESS state, is now completed exceptionally.
-      checkBlockAbsentException(future0, firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
-      checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(firstAttemptBlockId).getLocationFuture(), firstAttemptBlockId,
-          BlockState.State.NOT_AVAILABLE);
+      checkInProgressToNotAvailableException(future0, firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
+      checkInProgressToNotAvailableException(getSingleLocationFuture(firstAttemptBlockId, BlockState.State.NOT_AVAILABLE), firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
     }
 
     // Second attempt
     {
-      final String secondAttemptTaskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, SECOND_ATTEPMT);
+      final String secondAttemptTaskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, SECOND_ATTEMPT);
       final String secondAttemptBlockId = RuntimeIdManager.generateBlockId(edgeId, secondAttemptTaskId);
       final String executorId = RuntimeIdManager.generateExecutorId();
 
       // Re-scheduling the task.
       blockManagerMaster.onProducerTaskScheduled(secondAttemptTaskId, Collections.singleton(secondAttemptBlockId));
-      final Future<String> future1 = blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture();
+      final Future<String> future1 = getSingleLocationFuture(secondAttemptBlockId, BlockState.State.IN_PROGRESS);
       checkPendingFuture(future1);
 
       // Committed.
       blockManagerMaster.onBlockStateChanged(secondAttemptBlockId, BlockState.State.AVAILABLE, executorId);
       checkBlockLocation(future1, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
-      checkBlockLocation(blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture(), executorId);
+      checkBlockLocation(getSingleLocationFuture(secondAttemptBlockId, BlockState.State.AVAILABLE), executorId);
 
       // Then removed.
       blockManagerMaster.onBlockStateChanged(secondAttemptBlockId, BlockState.State.NOT_AVAILABLE, executorId);
-      checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture(), secondAttemptBlockId,
-          BlockState.State.NOT_AVAILABLE);
+      assertEquals(2, blockManagerMaster.getBlockHandlers(secondAttemptBlockId, BlockState.State.NOT_AVAILABLE).size());
     }
   }
+
+  private Future<String> getSingleLocationFuture(final String blockId, final BlockState.State state) {
+    final List<BlockManagerMaster.BlockRequestHandler> handlerList = blockManagerMaster.getBlockHandlers(blockId, state);
+    assertEquals(1, handlerList.size());
+    return handlerList.get(0).getLocationFuture();
+  }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
similarity index 93%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
index ab1bf0b..cabc218 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
@@ -38,11 +38,11 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 /**
- * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
+ * Test cases for {@link LocalitySchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class, BlockManagerMaster.class})
-public final class SourceLocationAwareSchedulingConstraintTest {
+public final class LocalitySchedulingConstraintTest {
   private Injector injector;
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -53,7 +53,7 @@ public final class SourceLocationAwareSchedulingConstraintTest {
     when(executorRepresenter.getNodeName()).thenReturn(executorId);
     return executorRepresenter;
   }
-  
+
   @Before
   public void setUp() throws Exception {
     injector = Tang.Factory.getTang().newInjector();
@@ -61,13 +61,13 @@ public final class SourceLocationAwareSchedulingConstraintTest {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
+   * {@link LocalitySchedulingConstraint} should fail to schedule a {@link Task} when
    * there are no executors in appropriate location(s).
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() throws InjectionException {
     final SchedulingConstraint schedulingConstraint = injector
-        .getInstance(SourceLocationAwareSchedulingConstraint.class);
+        .getInstance(LocalitySchedulingConstraint.class);
 
     // Prepare test scenario
     final Task task = CreateTask.withReadablesWithSourceLocations(
@@ -81,13 +81,13 @@ public final class SourceLocationAwareSchedulingConstraintTest {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingConstraint} should properly schedule {@link Task}s
+   * {@link LocalitySchedulingConstraint} should properly schedule {@link Task}s
    * with multiple source locations.
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() throws InjectionException {
     final SchedulingConstraint schedulingConstraint = injector
-        .getInstance(SourceLocationAwareSchedulingConstraint.class);
+        .getInstance(LocalitySchedulingConstraint.class);
     // Prepare test scenario
     final Task task0 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index 64abefe..eabcac4 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -46,7 +46,7 @@ public final class SchedulingConstraintnRegistryTest {
     assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ResourceSlotProperty.class, registry));
     assertEquals(ContainerTypeAwareSchedulingConstraint.class,
         getConstraintOf(ResourcePriorityProperty.class, registry));
-    assertEquals(SourceLocationAwareSchedulingConstraint.class,
+    assertEquals(LocalitySchedulingConstraint.class,
         getConstraintOf(ResourceLocalityProperty.class, registry));
   }
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index c2bbd12..af3fe81 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -14,7 +14,6 @@
  * limitations under the License.
  */
 package edu.snu.nemo.runtime.master.scheduler;
-
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
@@ -24,6 +23,7 @@ import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -72,6 +72,7 @@ public final class TaskRetryTest {
   private Scheduler scheduler;
   private ExecutorRegistry executorRegistry;
   private PlanStateManager planStateManager;
+  private BlockManagerMaster blockManagerMaster;
 
   private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
 
@@ -96,13 +97,21 @@ public final class TaskRetryTest {
   public void testExecutorRemoved() throws Exception {
     // Until the plan finishes, events happen
     while (!planStateManager.isPlanDone()) {
-      // 50% chance remove, 50% chance add, 80% chance task completed
-      executorRemoved(0.5);
-      executorAdded(0.5);
-      taskCompleted(0.8);
+      // 30% chance executor added, 30% chance executor removed
+      executorAdded(0.3);
+      executorRemoved(0.3);
+
+      // random - trigger speculative execution.
+      if (random.nextBoolean()) {
+        Thread.sleep(10);
+      } else {
+        Thread.sleep(20);
+      }
+
+      // 30% chance task completed,
+      taskCompleted(0.3);
 
-      // 10ms sleep
-      Thread.sleep(10);
+      scheduler.onSpeculativeExecutionCheck();
     }
 
     // Plan should COMPLETE
@@ -120,12 +129,17 @@ public final class TaskRetryTest {
     // Until the plan finishes, events happen
     while (!planStateManager.isPlanDone()) {
       // 50% chance task completed
-      // 50% chance task output write failed
+      // 70% chance task output write failed
       taskCompleted(0.5);
-      taskOutputWriteFailed(0.5);
+      taskOutputWriteFailed(0.7);
 
-      // 10ms sleep
-      Thread.sleep(10);
+      // random - trigger speculative execution.
+      if (random.nextBoolean()) {
+        Thread.sleep(10);
+      } else {
+        Thread.sleep(20);
+      }
+      scheduler.onSpeculativeExecutionCheck();
     }
 
     // Plan should COMPLETE
@@ -178,8 +192,17 @@ public final class TaskRetryTest {
     if (!executingTasks.isEmpty()) {
       final int randomIndex = random.nextInt(executingTasks.size());
       final String selectedTask = executingTasks.get(randomIndex);
-      SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
+
+
+      final Optional<ExecutorRepresenter> executor = executorRegistry.findExecutorForTask(selectedTask);
+      if (executor.isPresent()) {
+        SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
           TaskState.State.COMPLETE, RuntimeIdManager.getAttemptFromTaskId(selectedTask));
+        getOutputBlockIds(selectedTask).forEach(blockId ->
+          blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executor.get().getExecutorId()));
+      } else {
+        throw new RuntimeException(selectedTask);
+      }
     }
   }
 
@@ -221,7 +244,16 @@ public final class TaskRetryTest {
     injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
     planStateManager = injector.getInstance(PlanStateManager.class);
     scheduler = injector.getInstance(Scheduler.class);
+    blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
 
     scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);
   }
+
+  private Set<String> getOutputBlockIds(final String taskId) {
+    return planStateManager.getPhysicalPlan().getStageDAG()
+      .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+      .stream()
+      .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+      .collect(Collectors.toSet()); // ids of blocks this task will produce
+  }
 }