You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/08/22 03:40:36 UTC

[GitHub] sanha closed pull request #112: [NEMO-179] Delayed Task Cloning

sanha closed pull request #112: [NEMO-179] Delayed Task Cloning
URL: https://github.com/apache/incubator-nemo/pull/112
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
index cd0f312ce..6a24c6d95 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
@@ -17,30 +17,104 @@
 
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
+import java.io.Serializable;
+
 /**
  * Specifies cloned execution of a vertex.
  *
  * A major limitations of the current implementation:
  * *ALL* of the clones are always scheduled immediately
  */
-public final class ClonedSchedulingProperty extends VertexExecutionProperty<Integer> {
+public final class ClonedSchedulingProperty extends VertexExecutionProperty<ClonedSchedulingProperty.CloneConf> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private ClonedSchedulingProperty(final Integer value) {
+  private ClonedSchedulingProperty(final CloneConf value) {
     super(value);
   }
 
   /**
    * Static method exposing the constructor.
-   * @param value value of the new execution property.
+   * @param conf value of the new execution property.
    * @return the newly created execution property.
    */
-  public static ClonedSchedulingProperty of(final Integer value) {
-    if (value <= 0) {
-      throw new IllegalStateException(String.valueOf(value));
+  public static ClonedSchedulingProperty of(final CloneConf conf) {
+    return new ClonedSchedulingProperty(conf);
+  }
+
+  /**
+   * Configurations for cloning.
+   * TODO #199: Slot-aware cloning
+   */
+  public static final class CloneConf implements Serializable {
+    // Always clone, upfront.
+    private final boolean upFrontCloning;
+
+    // Fraction of tasks to wait for completion, before trying to clone.
+    // If this value is 0, then we always clone.
+    private final double fractionToWaitFor;
+
+    // How many times slower is a task than the median, in order to be cloned.
+    private final double medianTimeMultiplier;
+
+    /**
+     * Always clone, upfront.
+     */
+    public CloneConf() {
+      this.upFrontCloning = true;
+      this.fractionToWaitFor = 0.0;
+      this.medianTimeMultiplier = 0.0;
+    }
+
+    /**
+     * Clone stragglers judiciously.
+     * @param fractionToWaitFor before trying to clone.
+     * @param medianTimeMultiplier to identify stragglers.
+     */
+    public CloneConf(final double fractionToWaitFor, final double medianTimeMultiplier) {
+      if (fractionToWaitFor >= 1.0 || fractionToWaitFor <= 0) {
+        throw new IllegalArgumentException(String.valueOf(fractionToWaitFor));
+      }
+      if (medianTimeMultiplier < 1.0) {
+        throw new IllegalArgumentException(String.valueOf(medianTimeMultiplier));
+      }
+      this.upFrontCloning = false;
+      this.fractionToWaitFor = fractionToWaitFor;
+      this.medianTimeMultiplier = medianTimeMultiplier;
+    }
+
+    /**
+     * @return fractionToWaitFor.
+     */
+    public double getFractionToWaitFor() {
+      return fractionToWaitFor;
+    }
+
+    /**
+     * @return medianTimeMultiplier.
+     */
+    public double getMedianTimeMultiplier() {
+      return medianTimeMultiplier;
+    }
+
+    /**
+     * @return true if it is upfront cloning.
+     */
+    public boolean isUpFrontCloning() {
+      return upFrontCloning;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("upfront: ");
+      sb.append(upFrontCloning);
+      sb.append(" / fraction: ");
+      sb.append(fractionToWaitFor);
+      sb.append(" / multiplier: ");
+      sb.append(medianTimeMultiplier);
+      return sb.toString();
     }
-    return new ClonedSchedulingProperty(value);
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
similarity index 61%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
index 32797eb06..789718379 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
@@ -21,22 +21,27 @@
 import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
 
 /**
- * Set the ClonedScheduling property of source vertices.
+ * Speculative execution. (very aggressive, for unit tests)
+ * TODO #200: Maintain Test Passes and Policies Separately
  */
 @Annotates(ClonedSchedulingProperty.class)
-public final class ClonedSchedulingPass extends AnnotatingPass {
+public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
   /**
    * Default constructor.
    */
-  public ClonedSchedulingPass() {
-    super(ClonedSchedulingPass.class);
+  public AggressiveSpeculativeCloningPass() {
+    super(AggressiveSpeculativeCloningPass.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.getVertices().stream()
-        .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId()).isEmpty())
-        .forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(2)));
+    // Speculative execution policy.
+    final double fractionToWaitFor = 0.00000001; // Aggressive
+    final double medianTimeMultiplier = 1.00000001; // Aggressive
+
+    // Apply the policy to ALL vertices
+    dag.getVertices().forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(
+      new ClonedSchedulingProperty.CloneConf(fractionToWaitFor, medianTimeMultiplier))));
     return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
new file mode 100644
index 000000000..7c4920f05
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.Requires;
+
+/**
+ * Set the ClonedScheduling property of source vertices, in an upfront manner.
+ */
+@Annotates(ClonedSchedulingProperty.class)
+@Requires(CommunicationPatternProperty.class)
+public final class UpfrontCloningPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public UpfrontCloningPass() {
+    super(UpfrontCloningPass.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().stream()
+        .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
+          .stream()
+          // TODO #198: Handle Un-cloneable Beam Sink Operators
+          // only shuffle receivers (for now... as particular Beam sink operators fail when cloned)
+          .anyMatch(edge ->
+            edge.getPropertyValue(CommunicationPatternProperty.class)
+              .orElseThrow(() -> new IllegalStateException())
+              .equals(CommunicationPatternProperty.Value.Shuffle))
+          )
+        .forEach(vertex -> vertex.setProperty(
+          ClonedSchedulingProperty.of(new ClonedSchedulingProperty.CloneConf()))); // clone upfront, always
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 192ba2651..b4e17d1e1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -20,14 +20,17 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
 import org.apache.reef.tang.Injector;
 
 /**
  * Basic pull policy.
+ * TODO #200: Maintain Test Passes and Policies Separately
  */
 public final class BasicPullPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
       new PolicyBuilder()
+          .registerCompileTimePass(new AggressiveSpeculativeCloningPass())
           .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index c0c52ffa7..8ef91cffa 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -21,16 +21,19 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
 import org.apache.reef.tang.Injector;
 
 /**
  * Basic push policy.
+ * TODO #200: Maintain Test Passes and Policies Separately
  */
 public final class BasicPushPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder()
-          .registerCompileTimePass(new ShuffleEdgePushPass())
-          .registerCompileTimePass(new DefaultScheduleGroupPass());
+    new PolicyBuilder()
+      .registerCompileTimePass(new AggressiveSpeculativeCloningPass())
+      .registerCompileTimePass(new ShuffleEdgePushPass())
+      .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
 
   /**
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 121137084..48d598f79 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -112,7 +112,17 @@ public void testClonedScheduling() throws Exception {
         .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName() + "_clonedscheduling")
         .addMaxTaskAttempt(Integer.MAX_VALUE)
-        .addOptimizationPolicy(ClonedSchedulingPolicyParallelismFive.class.getCanonicalName())
+        .addOptimizationPolicy(UpfrontSchedulingPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
+
+  @Test (timeout = TIMEOUT)
+  public void testSpeculativeExecution() throws Exception {
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WordCountITCase.class.getSimpleName() + "_speculative")
+      .addMaxTaskAttempt(Integer.MAX_VALUE)
+      .addOptimizationPolicy(AggressiveSpeculativeCloningPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
 }
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
new file mode 100644
index 000000000..31bd14bd9
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam.policy;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
+import org.apache.reef.tang.Injector;
+
+import java.util.List;
+
+/**
+ * A default policy with (aggressive) speculative execution.
+ */
+public final class AggressiveSpeculativeCloningPolicyParallelismFive implements Policy {
+  private final Policy policy;
+  public AggressiveSpeculativeCloningPolicyParallelismFive() {
+    final List<CompileTimePass> overwritingPasses = DefaultPolicy.BUILDER.getCompileTimePasses();
+    overwritingPasses.add(new AggressiveSpeculativeCloningPass()); // CLONING!
+    this.policy = new PolicyImpl(
+        PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
+        DefaultPolicy.BUILDER.getRuntimePasses());
+  }
+  @Override
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
+    return this.policy.runCompileTimeOptimization(dag, dagDirectory);
+  }
+  @Override
+  public void registerRunTimeOptimizations(final Injector injector, final PubSubEventHandlerWrapper pubSubWrapper) {
+    this.policy.registerRunTimeOptimizations(injector, pubSubWrapper);
+  }
+}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
similarity index 87%
rename from examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
rename to examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
index b0e634833..a75603777 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ClonedSchedulingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.UpfrontCloningPass;
 import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
@@ -28,13 +28,13 @@
 import java.util.List;
 
 /**
- * A default policy with cloning for tests.
+ * A default policy with upfront cloning.
  */
-public final class ClonedSchedulingPolicyParallelismFive implements Policy {
+public final class UpfrontSchedulingPolicyParallelismFive implements Policy {
   private final Policy policy;
-  public ClonedSchedulingPolicyParallelismFive() {
+  public UpfrontSchedulingPolicyParallelismFive() {
     final List<CompileTimePass> overwritingPasses = DefaultPolicy.BUILDER.getCompileTimePasses();
-    overwritingPasses.add(new ClonedSchedulingPass()); // CLONING!
+    overwritingPasses.add(new UpfrontCloningPass()); // CLONING!
     this.policy = new PolicyImpl(
         PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
         DefaultPolicy.BUILDER.getRuntimePasses());
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index b2329ee9c..b21751ca1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -31,9 +31,9 @@ private StateMachine buildBlockStateMachine() {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
     stateMachineBuilder.addState(State.IN_PROGRESS, "The block is in the progress of being created.");
     stateMachineBuilder.addState(State.AVAILABLE, "The block is available.");
+    stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
 
     // From IN_PROGRESS
     stateMachineBuilder.addTransition(State.IN_PROGRESS, State.AVAILABLE, "The block is successfully created");
@@ -41,13 +41,7 @@ private StateMachine buildBlockStateMachine() {
         "The block is lost before being created");
 
     // From AVAILABLE
-    stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is lost");
-
-    // From NOT_AVAILABLE
-    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.IN_PROGRESS,
-        "The task that produces the block is scheduled.");
-    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.NOT_AVAILABLE,
-        "A block can be reported lost from multiple sources");
+    stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is not available");
 
     stateMachineBuilder.setInitialState(State.IN_PROGRESS);
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index e7edbadeb..1a63f0ea2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -44,6 +44,8 @@ private StateMachine buildTaskStateMachine() {
     stateMachineBuilder.addTransition(State.INCOMPLETE, State.COMPLETE, "All tasks complete");
     stateMachineBuilder.addTransition(State.COMPLETE, State.INCOMPLETE,
         "Completed before, but a task in this stage should be retried");
+    stateMachineBuilder.addTransition(State.COMPLETE, State.COMPLETE,
+      "Completed before, but probably a cloned task has completed again");
 
     stateMachineBuilder.setInitialState(State.INCOMPLETE);
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index def088871..05b7467f5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -31,6 +31,7 @@
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -85,6 +86,8 @@ private BlockManagerMaster(final MessageEnvironment masterMessageEnvironment) {
 
   /**
    * Initializes the states of a block which will be produced by a producer task.
+   * This method is idempotent thanks to the 'Set' data structures.
+   * See BatchScheduler#doSchedule for details on scheduling same task attempts multiple times.
    *
    * @param blockId        the id of the block to initialize.
    * @param producerTaskId the id of the producer task.
@@ -134,31 +137,22 @@ private void initializeState(final String blockId, final String producerTaskId)
   }
 
   /**
-   * Returns a handler of block location requests.
-   *
-   * @param blockIdOrWildcard id of the specified block.
-   * @return the handler of block location requests, which completes exceptionally when the block
-   * is not {@code IN_PROGRESS} or {@code AVAILABLE}.
+   * Get handlers of blocks that are in a particular state.
+   * @param blockIdOrWildcard to query
+   * @param state of the block
+   * @return the handlers, empty if none matches.
    */
-  public BlockRequestHandler getBlockLocationHandler(final String blockIdOrWildcard) {
+  public List<BlockRequestHandler> getBlockHandlers(final String blockIdOrWildcard,
+                                                    final BlockState.State state) {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
       final Set<BlockMetadata> metadataSet =
         getBlockWildcardStateSet(RuntimeIdManager.getWildCardFromBlockId(blockIdOrWildcard));
-      final List<BlockMetadata> candidates = metadataSet.stream()
-        .filter(metadata -> metadata.getBlockState().equals(BlockState.State.IN_PROGRESS)
-          || metadata.getBlockState().equals(BlockState.State.AVAILABLE))
+      return metadataSet.stream()
+        .filter(metadata -> metadata.getBlockState().equals(state))
+        .map(BlockMetadata::getLocationHandler)
         .collect(Collectors.toList());
-      if (!candidates.isEmpty()) {
-        // Randomly pick one of the candidate handlers.
-        return candidates.get(random.nextInt(candidates.size())).getLocationHandler();
-      } else {
-        // No candidate exists
-        final BlockRequestHandler handler = new BlockRequestHandler(blockIdOrWildcard);
-        handler.completeExceptionally(new AbsentBlockException(blockIdOrWildcard, BlockState.State.NOT_AVAILABLE));
-        return handler;
-      }
     } finally {
       readLock.unlock();
     }
@@ -170,7 +164,7 @@ public BlockRequestHandler getBlockLocationHandler(final String blockIdOrWildcar
    * @param blockId the id of the block.
    * @return the ids of the producer tasks.
    */
-  private Set<String> getProducerTaskIds(final String blockId) {
+  public Set<String> getProducerTaskIds(final String blockId) {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
@@ -241,7 +235,9 @@ public void onProducerTaskFailed(final String failedTaskId) {
             if (location.get().equals(executorId)) {
               blockIds.add(blockMetadata.getBlockId());
             }
-          } catch (final InterruptedException | ExecutionException e) {
+          } catch (final CancellationException | ExecutionException e) {
+            // Don't add (NOT_AVAILABLE)
+          } catch (final InterruptedException e) {
             // Cannot reach here because we check the completion of the future already.
             LOG.error("Exception while getting the location of a block!", e);
             Thread.currentThread().interrupt();
@@ -302,21 +298,37 @@ private BlockMetadata getBlockMetaData(final String blockId) {
   }
 
   /**
-   * Deals with a request for the location of a block.
-   *
    * @param message        the request message.
    * @param messageContext the message context which will be used for response.
    */
-  void onRequestBlockLocation(final ControlMessage.Message message,
-                              final MessageContext messageContext) {
+  private void registerLocationRequest(final ControlMessage.Message message, final MessageContext messageContext) {
     assert (message.getType() == ControlMessage.MessageType.RequestBlockLocation);
     final String blockIdWildcard = message.getRequestBlockLocationMsg().getBlockIdWildcard();
     final long requestId = message.getId();
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
-      final BlockRequestHandler locationFuture = getBlockLocationHandler(blockIdWildcard);
-      locationFuture.registerRequest(requestId, messageContext);
+      // (CASE 1) Check AVAILABLE blocks.
+      final List<BlockRequestHandler> availableBlocks = getBlockHandlers(blockIdWildcard, BlockState.State.AVAILABLE);
+      if (!availableBlocks.isEmpty()) {
+        // random pick
+        // TODO #201: Let Executors Try Multiple Input Block Clones
+        availableBlocks.get(random.nextInt(availableBlocks.size())).registerRequest(requestId, messageContext);
+        return;
+      }
+
+      // (CASE 2) Check IN_PROGRESS blocks.
+      final List<BlockRequestHandler> progressBlocks = getBlockHandlers(blockIdWildcard, BlockState.State.IN_PROGRESS);
+      if (!progressBlocks.isEmpty()) {
+        // random pick
+        progressBlocks.get(random.nextInt(progressBlocks.size())).registerRequest(requestId, messageContext);
+        return;
+      }
+
+      // (CASE 3) Unfortunately, there is no good block to use.
+      final BlockRequestHandler absent = new BlockRequestHandler(blockIdWildcard);
+      absent.completeExceptionally(new AbsentBlockException(blockIdWildcard, BlockState.State.NOT_AVAILABLE));
+      absent.registerRequest(requestId, messageContext);
     } finally {
       readLock.unlock();
     }
@@ -352,7 +364,7 @@ public void onMessage(final ControlMessage.Message message) {
     public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
       switch (message.getType()) {
         case RequestBlockLocation:
-          onRequestBlockLocation(message, messageContext);
+          registerLocationRequest(message, messageContext);
           break;
         default:
           throw new IllegalMessageException(
@@ -366,16 +378,16 @@ public void onMessageWithContext(final ControlMessage.Message message, final Mes
    * The handler of block location requests.
    */
   public static final class BlockRequestHandler {
-    private final String blockId;
+    private final String blockIdOrWildcard;
     private final CompletableFuture<String> locationFuture;
 
     /**
      * Constructor.
      *
-     * @param blockId the ID of the block.
+     * @param blockIdOrWildcard the ID of the block.
      */
-    BlockRequestHandler(final String blockId) {
-      this.blockId = blockId;
+    BlockRequestHandler(final String blockIdOrWildcard) {
+      this.blockIdOrWildcard = blockIdOrWildcard;
       this.locationFuture = new CompletableFuture<>();
     }
 
@@ -411,7 +423,7 @@ void registerRequest(final long requestId,
       final ControlMessage.BlockLocationInfoMsg.Builder infoMsgBuilder =
         ControlMessage.BlockLocationInfoMsg.newBuilder()
           .setRequestId(requestId)
-          .setBlockId(blockId);
+          .setBlockId(blockIdOrWildcard);
 
       locationFuture.whenComplete((location, throwable) -> {
         if (throwable == null) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index e1f2af7f7..a5eb8c332 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -64,9 +64,7 @@ synchronized void onStateChanged(final BlockState.State newState,
       case IN_PROGRESS:
         break;
       case NOT_AVAILABLE:
-        // Reset the block location and committer information.
         locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
-        locationHandler = new BlockManagerMaster.BlockRequestHandler(blockId);
         break;
       case AVAILABLE:
         if (location == null) {
@@ -115,4 +113,17 @@ public String toString() {
     sb.append(")");
     return sb.toString();
   }
+
+  @Override
+  public boolean equals(final Object that) {
+    if (!(that instanceof BlockMetadata)) {
+      return false;
+    }
+    return this.blockId.equals(((BlockMetadata) that).getBlockId());
+  }
+
+  @Override
+  public int hashCode() {
+    return blockId.hashCode();
+  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index f798ac1cb..501731197 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -74,6 +74,13 @@
   private final Map<String, StageState> stageIdToState;
   private final Map<String, List<List<TaskState>>> stageIdToTaskAttemptStates; // sorted by task idx, and then attempt
 
+  /**
+   * Used for speculative cloning. (in the unit of milliseconds - ms)
+   */
+  private final Map<String, Long> taskIdToStartTimeMs = new HashMap<>();
+  private final Map<String, List<Long>> stageIdToCompletedTaskTimeMsList = new HashMap<>();
+  private final Map<String, Map<Integer, Integer>> stageIdToTaskIndexToNumOfClones = new HashMap<>();
+
   /**
    * Represents the plan to manage.
    */
@@ -122,6 +129,8 @@ public synchronized void updatePlan(final PhysicalPlan physicalPlanToUpdate,
     if (!initialized) {
       // First scheduling.
       this.initialized = true;
+    } else {
+      LOG.info("Update Plan from {} to {}", physicalPlan.getPlanId(), physicalPlanToUpdate.getPlanId());
     }
     this.planState = new PlanState();
     this.metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlanToUpdate.getStageDAG());
@@ -129,22 +138,22 @@ public synchronized void updatePlan(final PhysicalPlan physicalPlanToUpdate,
     this.physicalPlan = physicalPlanToUpdate;
     this.planId = physicalPlanToUpdate.getPlanId();
     this.maxScheduleAttempt = maxScheduleAttemptToSet;
-    initializeComputationStates();
+    initializeStates();
   }
 
   /**
    * Initializes the states for the plan/stages/tasks for this plan.
    * TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
    */
-  private void initializeComputationStates() {
+  private void initializeStates() {
     onPlanStateChanged(PlanState.State.EXECUTING);
     physicalPlan.getStageDAG().topologicalDo(stage -> {
       if (!stageIdToState.containsKey(stage.getId())) {
         stageIdToState.put(stage.getId(), new StageState());
         stageIdToTaskAttemptStates.put(stage.getId(), new ArrayList<>(stage.getParallelism()));
+
         // for each task idx of this stage
         for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
-          // for each task idx of this stage
           stageIdToTaskAttemptStates.get(stage.getId()).add(new ArrayList<>());
           // task states will be initialized lazily in getTaskAttemptsToSchedule()
         }
@@ -152,6 +161,9 @@ private void initializeComputationStates() {
     });
   }
 
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// Core scheduling methods
+
   /**
    * Get task attempts that are "READY".
    *
@@ -169,7 +181,7 @@ private void initializeComputationStates() {
     final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
     for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
       final List<TaskState> attemptStatesForThisTaskIndex =
-        stageIdToTaskAttemptStates.get(stage.getId()).get(taskIndex);
+        stageIdToTaskAttemptStates.get(stageId).get(taskIndex);
 
       // If one of the attempts is COMPLETE, do not schedule
       if (attemptStatesForThisTaskIndex
@@ -177,10 +189,17 @@ private void initializeComputationStates() {
         .noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
 
         // (Step 1) Create new READY attempts, as many as
-        // # of clones - # of 'not-done' attempts)
-        final int numOfClones = stage.getPropertyValue(ClonedSchedulingProperty.class).orElse(1);
+        // # of numOfConcurrentAttempts(including clones) - # of 'not-done' attempts
+        stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
+        final Optional<ClonedSchedulingProperty.CloneConf> cloneConf =
+          stage.getPropertyValue(ClonedSchedulingProperty.class);
+        final int numOfConcurrentAttempts = cloneConf.isPresent() && cloneConf.get().isUpFrontCloning()
+          // For now we support up to 1 clone (2 concurrent = 1 original + 1 clone)
+          ? 2
+          // If the property is not set, then we do not clone (= 1 concurrent)
+          : stageIdToTaskIndexToNumOfClones.get(stageId).getOrDefault(stageId, 1);
         final long numOfNotDoneAttempts = attemptStatesForThisTaskIndex.stream().filter(this::isTaskNotDone).count();
-        for (int i = 0; i < numOfClones - numOfNotDoneAttempts; i++) {
+        for (int i = 0; i < numOfConcurrentAttempts - numOfNotDoneAttempts; i++) {
           attemptStatesForThisTaskIndex.add(new TaskState());
         }
 
@@ -204,23 +223,66 @@ private void initializeComputationStates() {
     return taskAttemptsToSchedule;
   }
 
-  private boolean isTaskNotDone(final TaskState taskState) {
-    final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
-    return state.equals(TaskState.State.READY)
-      || state.equals(TaskState.State.EXECUTING)
-      || state.equals(TaskState.State.ON_HOLD);
-  }
-
   /**
-   * Gets the attempt numbers of all tasks in a stage.
-   *
-   * @param stageId the stage to investigate.
-   * @return the attempt numbers of all tasks in a stage.
+   * @param stageId to query.
+   * @return all task attempt ids of the stage.
    */
   public synchronized Set<String> getAllTaskAttemptsOfStage(final String stageId) {
     return getTaskAttemptIdsToItsState(stageId).keySet();
   }
 
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// Speculative execution
+
+  /**
+   * @param stageId to query.
+   * @return a map from an EXECUTING task to its running time so far.
+   */
+  public synchronized Map<String, Long> getExecutingTaskToRunningTimeMs(final String stageId) {
+    final long curTime = System.currentTimeMillis();
+    final Map<String, Long> result = new HashMap<>();
+
+    final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
+    for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
+      final List<TaskState> attemptStates = taskStates.get(taskIndex);
+      for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
+        if (TaskState.State.EXECUTING.equals(attemptStates.get(attempt).getStateMachine().getCurrentState())) {
+          final String taskId = RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt);
+          result.put(taskId, curTime - taskIdToStartTimeMs.get(taskId));
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * List of task times so far for this stage.
+   * @param stageId of the stage.
+   * @return a copy of the list, empty if none completed.
+   */
+  public synchronized List<Long> getCompletedTaskTimeListMs(final String stageId) {
+    // Return a copy
+    return new ArrayList<>(stageIdToCompletedTaskTimeMsList.getOrDefault(stageId, new ArrayList<>(0)));
+  }
+
+  /**
+   * @param stageId of the clone.
+   * @param taskIndex of the clone.
+   * @param numOfClones of the clone.
+   * @return true if the numOfClones has been modified, false otherwise
+   */
+  public synchronized boolean setNumOfClones(final String stageId, final int taskIndex, final int numOfClones) {
+    stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
+    // overwrite the previous value.
+    final Integer previousNumOfClones = stageIdToTaskIndexToNumOfClones.get(stageId).put(taskIndex, numOfClones);
+    return (previousNumOfClones == null) || (previousNumOfClones != numOfClones);
+  }
+
+
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// State transitions
+
   /**
    * Updates the state of a task.
    * Task state changes can occur both in master and executor.
@@ -252,18 +314,28 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
     final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
     final List<List<TaskState>> taskStatesOfThisStage = stageIdToTaskAttemptStates.get(stageId);
     final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.stream()
-      .map(attempts -> attempts.stream()
-        .map(state -> state.getStateMachine().getCurrentState())
-        .allMatch(curState -> curState.equals(TaskState.State.COMPLETE)
-          || curState.equals(TaskState.State.SHOULD_RETRY)
-          || curState.equals(TaskState.State.ON_HOLD)))
-      .filter(bool -> bool.equals(true))
+      .filter(attempts -> {
+        final List<TaskState.State> states = attempts
+          .stream()
+          .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+          .collect(Collectors.toList());
+        return states.stream().anyMatch(curState -> curState.equals(TaskState.State.ON_HOLD)) // one of them is ON_HOLD
+          || states.stream().anyMatch(curState -> curState.equals(TaskState.State.COMPLETE)); // one of them is COMPLETE
+      })
       .count();
     if (newTaskState.equals(TaskState.State.COMPLETE)) {
       LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
         taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
     }
 
+    // Maintain info for speculative execution
+    if (newTaskState.equals(TaskState.State.EXECUTING)) {
+      taskIdToStartTimeMs.put(taskId, System.currentTimeMillis());
+    } else if (newTaskState.equals(TaskState.State.COMPLETE)) {
+      stageIdToCompletedTaskTimeMsList.putIfAbsent(stageId, new ArrayList<>());
+      stageIdToCompletedTaskTimeMsList.get(stageId).add(System.currentTimeMillis() - taskIdToStartTimeMs.get(taskId));
+    }
+
     // Change stage state, if needed
     switch (newTaskState) {
       // INCOMPLETE stage
@@ -295,20 +367,6 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
     }
   }
 
-  private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) {
-    final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
-    final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
-    final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
-
-    final List<TaskState> otherAttemptsforTheSameTaskIndex =
-      new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
-    otherAttemptsforTheSameTaskIndex.remove(attempt);
-
-    return otherAttemptsforTheSameTaskIndex.stream()
-      .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
-      .collect(Collectors.toList());
-  }
-
   /**
    * (PRIVATE METHOD)
    * Updates the state of a stage.
@@ -376,6 +434,8 @@ private void onPlanStateChanged(final PlanState.State newState) {
     }
   }
 
+  /////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////// Helper Methods
 
   /**
    * Wait for this plan to be finished and return the final state.
@@ -422,6 +482,18 @@ private void onPlanStateChanged(final PlanState.State newState) {
     return getPlanState();
   }
 
+  /**
+   * @return a map from task attempt id to its current state.
+   */
+  @VisibleForTesting
+  public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
+    return physicalPlan.getStageDAG().getVertices()
+      .stream()
+      .map(Stage::getId)
+      .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
+      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
   /**
    * @return whether the execution for the plan is done or not.
    */
@@ -459,6 +531,19 @@ public synchronized String getPlanId() {
     return (TaskState.State) getTaskStateHelper(taskId).getStateMachine().getCurrentState();
   }
 
+  private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
+    final Map<String, TaskState.State> result = new HashMap<>();
+    final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
+    for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
+      final List<TaskState> attemptStates = taskStates.get(taskIndex);
+      for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
+        result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
+          (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
+      }
+    }
+    return result;
+  }
+
   private TaskState getTaskStateHelper(final String taskId) {
     return stageIdToTaskAttemptStates
       .get(RuntimeIdManager.getStageIdFromTaskId(taskId))
@@ -466,6 +551,27 @@ private TaskState getTaskStateHelper(final String taskId) {
       .get(RuntimeIdManager.getAttemptFromTaskId(taskId));
   }
 
+  private boolean isTaskNotDone(final TaskState taskState) {
+    final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
+    return state.equals(TaskState.State.READY)
+      || state.equals(TaskState.State.EXECUTING)
+      || state.equals(TaskState.State.ON_HOLD);
+  }
+
+  private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) {
+    final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
+    final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
+    final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
+
+    final List<TaskState> otherAttemptsforTheSameTaskIndex =
+      new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
+    otherAttemptsforTheSameTaskIndex.remove(attempt);
+
+    return otherAttemptsforTheSameTaskIndex.stream()
+      .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+      .collect(Collectors.toList());
+  }
+
   /**
    * @return the physical plan.
    */
@@ -548,26 +654,4 @@ public synchronized String toString() {
     sb.append("]}");
     return sb.toString();
   }
-
-  @VisibleForTesting
-  public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
-    return physicalPlan.getStageDAG().getVertices()
-      .stream()
-      .map(Stage::getId)
-      .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
-      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-  }
-
-  private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
-    final Map<String, TaskState.State> result = new HashMap<>();
-    final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
-    for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
-      final List<TaskState> attemptStates = taskStates.get(taskIndex);
-      for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
-        result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
-          (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
-      }
-    }
-    return result;
-  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 42854698f..00eac8791 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -72,8 +72,11 @@
   private static final int DAG_LOGGING_PERIOD = 3000;
   private static final int METRIC_ARRIVE_TIMEOUT = 10000;
   private static final int REST_SERVER_PORT = 10101;
+  private static final int SPECULATION_CHECKING_PERIOD_MS = 100;
 
   private final ExecutorService runtimeMasterThread;
+  private final ScheduledExecutorService speculativeTaskCloningThread;
+
   private final Scheduler scheduler;
   private final ContainerManager containerManager;
   private final MetricMessageHandler metricMessageHandler;
@@ -103,6 +106,16 @@ private RuntimeMaster(final Scheduler scheduler,
     // and keeping it single threaded removes the complexity of multi-thread synchronization.
     this.runtimeMasterThread =
         Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
+
+    // Check for speculative execution every second.
+    this.speculativeTaskCloningThread = Executors
+      .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "SpeculativeTaskCloning thread"));
+    this.speculativeTaskCloningThread.scheduleAtFixedRate(
+      () -> this.runtimeMasterThread.submit(scheduler::onSpeculativeExecutionCheck),
+      SPECULATION_CHECKING_PERIOD_MS,
+      SPECULATION_CHECKING_PERIOD_MS,
+      TimeUnit.MILLISECONDS);
+
     this.scheduler = scheduler;
     this.containerManager = containerManager;
     this.metricMessageHandler = metricMessageHandler;
@@ -169,6 +182,9 @@ private Server startRestMetricServer() {
    * Terminates the RuntimeMaster.
    */
   public void terminate() {
+    // No need to speculate anymore
+    speculativeTaskCloningThread.shutdown();
+
     // send metric flush request to all executors
     metricManagerMaster.sendMetricFlushRequest();
     try {
@@ -181,6 +197,7 @@ public void terminate() {
       // clean up state...
       Thread.currentThread().interrupt();
     }
+
     runtimeMasterThread.execute(() -> {
       scheduler.terminate();
       try {
@@ -406,7 +423,6 @@ private void handleControlMessage(final ControlMessage.Message message) {
   private ScheduledExecutorService scheduleDagLogging() {
     final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor();
     dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
-
       public void run() {
         planStateManager.storeJSON("periodic");
       }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index ac2db8626..5ccc2fc96 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -17,13 +17,16 @@
 
 import com.google.common.collect.Sets;
 import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.PlanAppender;
 import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
@@ -34,6 +37,7 @@
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +45,7 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -92,7 +96,7 @@ private BatchScheduler(final TaskDispatcher taskDispatcher,
     updatePhysicalPlanEventHandler.setScheduler(this);
     if (pubSubEventHandlerWrapper.getPubSubEventHandler() != null) {
       pubSubEventHandlerWrapper.getPubSubEventHandler()
-          .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
+        .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
     }
     this.executorRegistry = executorRegistry;
     this.planStateManager = planStateManager;
@@ -121,7 +125,7 @@ public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
     } else {
       // Append the submitted plan to the original plan.
       final PhysicalPlan appendedPlan =
-          PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
+        PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
       updatePlan(appendedPlan, maxScheduleAttempt);
       planStateManager.storeJSON("appended");
     }
@@ -147,11 +151,11 @@ private void updatePlan(final PhysicalPlan newPhysicalPlan,
                           final int maxScheduleAttempt) {
     planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
     this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
-        .collect(Collectors.groupingBy(Stage::getScheduleGroup))
-        .entrySet().stream()
-        .sorted(Map.Entry.comparingByKey())
-        .map(Map.Entry::getValue)
-        .collect(Collectors.toList());
+      .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+      .entrySet().stream()
+      .sorted(Map.Entry.comparingByKey())
+      .map(Map.Entry::getValue)
+      .collect(Collectors.toList());
   }
 
   /**
@@ -186,7 +190,7 @@ public void onTaskStateReportFromExecutor(final String executorId,
         break;
       case FAILED:
         throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
-            .append(taskId).append(" in Executor ").append(executorId).toString()));
+          .append(taskId).append(" in Executor ").append(executorId).toString()));
       case READY:
       case EXECUTING:
         throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
@@ -227,6 +231,56 @@ public void onTaskStateReportFromExecutor(final String executorId,
     }
   }
 
+  @Override
+  public void onSpeculativeExecutionCheck() {
+    MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
+
+    selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
+      scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
+        final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
+
+        // Only if the ClonedSchedulingProperty is set...
+        stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
+          if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
+            final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
+            final int parallelism = stage.getParallelism();
+            final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();
+
+            // Only after the fraction of the tasks are done...
+            // Delayed cloning (aggressive)
+            if (completedTaskTimes.length > 0
+              && completedTaskTimes.length >= Math.round(parallelism * fractionToWaitFor)) {
+              Arrays.sort(completedTaskTimes);
+              final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
+              final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
+              final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
+              for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {
+
+                // Only if the running task is considered a 'straggler'....
+                final long runningTime = entry.getValue();
+                if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
+                  final String taskId = entry.getKey();
+                  final boolean isCloned = planStateManager.setNumOfClones(
+                    stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
+                  if (isCloned) {
+                    LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+                        + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
+                      medianTime, medianTimeMultiplier);
+                  }
+                  isNumOfCloneChanged.setValue(isCloned);
+                }
+              }
+            }
+          }
+        });
+      });
+    });
+
+    if (isNumOfCloneChanged.booleanValue()) {
+      doSchedule(); // Do schedule the new clone.
+    }
+  }
+
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
     LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
@@ -246,6 +300,9 @@ public void onExecutorRemoved(final String executorId) {
       return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
     });
 
+    // Blocks of the interrupted tasks are failed.
+    interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
+
     // Retry the interrupted tasks (and required parents)
     retryTasksAndRequiredParents(interruptedTasks);
 
@@ -273,25 +330,21 @@ private void doSchedule() {
     final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
 
     if (earliest.isPresent()) {
-      // Get schedulable tasks.
       final List<Task> tasksToSchedule = earliest.get().stream()
-          .flatMap(stage -> selectSchedulableTasks(stage).stream())
-          .collect(Collectors.toList());
-
-      // We prefer (but not guarantee) to schedule the 'receiving' tasks first,
-      // assuming that tasks within a ScheduleGroup are connected with 'push' edges.
-      Collections.reverse(tasksToSchedule);
-
-      LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+        .flatMap(stage -> selectSchedulableTasks(stage).stream())
+        .collect(Collectors.toList());
+      if (!tasksToSchedule.isEmpty()) {
+        LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
           .map(Task::getTaskId)
           .map(RuntimeIdManager::getStageIdFromTaskId)
           .collect(Collectors.toSet()));
 
-      // Set the pointer to the schedulable tasks.
-      pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+        // Set the pointer to the schedulable tasks.
+        pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
 
-      // Notify the dispatcher that a new collection is available.
-      taskDispatcher.onNewPendingTaskCollectionAvailable();
+        // Notify the dispatcher that a new collection is available.
+        taskDispatcher.onNewPendingTaskCollectionAvailable();
+      }
     } else {
       LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
     }
@@ -303,11 +356,11 @@ private void doSchedule() {
     }
 
     return sortedScheduleGroups.stream()
-        .filter(scheduleGroup -> scheduleGroup.stream()
-            .map(Stage::getId)
-            .map(planStateManager::getStageState)
-            .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
-        .findFirst(); // selects the one with the smallest scheduling group index.
+      .filter(scheduleGroup -> scheduleGroup.stream()
+        .map(Stage::getId)
+        .map(planStateManager::getStageState)
+        .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
+      .findFirst(); // selects the one with the smallest scheduling group index.
   }
 
   private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
@@ -322,9 +375,9 @@ private void doSchedule() {
     }
 
     final List<StageEdge> stageIncomingEdges =
-        planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+      planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<StageEdge> stageOutgoingEdges =
-        planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+      planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
     // Create and return tasks.
     final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
@@ -332,21 +385,17 @@ private void doSchedule() {
     final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
     final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
-      final Set<String> blockIds = planStateManager.getPhysicalPlan().getStageDAG()
-          .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
-          .stream()
-          .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
-          .collect(Collectors.toSet()); // ids of blocks this task will produce
+      final Set<String> blockIds = getOutputBlockIds(taskId);
       blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
       final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
       tasks.add(new Task(
-          planStateManager.getPhysicalPlan().getPlanId(),
-          taskId,
-          stageToSchedule.getExecutionProperties(),
-          stageToSchedule.getSerializedIRDAG(),
-          stageIncomingEdges,
-          stageOutgoingEdges,
-          vertexIdToReadables.get(taskIdx)));
+        planStateManager.getPhysicalPlan().getPlanId(),
+        taskId,
+        stageToSchedule.getExecutionProperties(),
+        stageToSchedule.getSerializedIRDAG(),
+        stageIncomingEdges,
+        stageOutgoingEdges,
+        vertexIdToReadables.get(taskIdx)));
     });
     return tasks;
   }
@@ -407,7 +456,7 @@ private void onTaskExecutionOnHold(final String executorId,
     final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
-        planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+      planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
     final StageEdge targetEdge = getEdgeToOptimize(taskId);
     if (targetEdge == null) {
@@ -416,11 +465,11 @@ private void onTaskExecutionOnHold(final String executorId,
 
     if (stageComplete) {
       final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
-          .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
-          .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+        .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+        .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
       pubSubEventHandlerWrapper.getPubSubEventHandler()
-          .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
-              taskId, executorId, targetEdge));
+        .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
+          taskId, executorId, targetEdge));
     }
   }
 
@@ -461,50 +510,77 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
     final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
     LOG.info("Will be retried: {}", tasksToRetry);
     tasksToRetry.forEach(
-        taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+      taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
   }
 
   private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
     if (children.isEmpty()) {
       return Collections.emptySet();
     }
+    final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
+
+    final Map<String, StageEdge> idToIncomingEdges = children.stream()
+      .map(RuntimeIdManager::getStageIdFromTaskId)
+      .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
+      // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
+      .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
 
     final Set<String> parentsWithLostBlocks = children.stream()
-        .flatMap(child -> getParentTasks(child).stream())
-        .filter(parent -> {
-          final CompletableFuture<String> locationFuture =
-            blockManagerMaster.getBlockLocationHandler(parent).getLocationFuture();
-          return locationFuture.isCompletedExceptionally() || locationFuture.isCancelled();
-        })
-        .collect(Collectors.toSet());
+      .flatMap(child -> getInputBlockIds(child).stream()) // child task id -> parent block ids
+      .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
+      .collect(Collectors.toSet()).stream() // remove duplicate wildcards
+      .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
+        blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
+      .flatMap(lostParentBlockWildcard -> {
+        // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
+        final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
+        final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
+        final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
+        return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
+          .stream()
+          .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
+            && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
+          // COMPLETE -> SHOULD_RETRY
+          .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
+      })
+      .collect(Collectors.toSet());
+
 
     // Recursive call
     return Sets.union(parentsWithLostBlocks, recursivelyGetParentTasksForLostBlocks(parentsWithLostBlocks));
   }
 
-  private Set<String> getParentTasks(final String childTaskId) {
+  private Set<String> getOutputBlockIds(final String taskId) {
+    return planStateManager.getPhysicalPlan().getStageDAG()
+      .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+      .stream()
+      .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+      .collect(Collectors.toSet()); // ids of blocks this task will produce
+  }
 
+  private Set<String> getInputBlockIds(final String childTaskId) {
     final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
     return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
-        .stream()
-        .flatMap(inStageEdge -> {
-          final String parentStageId = inStageEdge.getSrc().getId();
-          final Set<String> tasksOfParentStage = planStateManager.getAllTaskAttemptsOfStage(parentStageId);
-
-          switch (inStageEdge.getDataCommunicationPattern()) {
-            case Shuffle:
-            case BroadCast:
-              // All of the parent stage's tasks
-              return tasksOfParentStage.stream();
-            case OneToOne:
-              // Same-index tasks of the parent stage
-              return tasksOfParentStage.stream().filter(task ->
-                  RuntimeIdManager.getIndexFromTaskId(task) == RuntimeIdManager.getIndexFromTaskId(childTaskId));
-            default:
-              throw new IllegalStateException(inStageEdge.toString());
-          }
-        })
-        .collect(Collectors.toSet());
+      .stream()
+      .flatMap(inStageEdge -> {
+        final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
+        switch (inStageEdge.getDataCommunicationPattern()) {
+          case Shuffle:
+          case BroadCast:
+            // All of the parent stage's tasks
+            return parentTaskIds.stream()
+              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+          case OneToOne:
+            // Same-index tasks of the parent stage
+            return parentTaskIds.stream()
+              .filter(parentTaskId ->
+                RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
+              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+          default:
+            throw new IllegalStateException(inStageEdge.toString());
+        }
+      })
+      .collect(Collectors.toSet());
   }
 
   /**
@@ -514,8 +590,8 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
    */
   public void updateDynOptData(final Object dynOptData) {
     final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
-        .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
-        .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+
+      .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
     dynOptDataHandler.updateDynOptData(dynOptData);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
similarity index 63%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
index 4e37fde2a..769719cef 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
@@ -24,6 +24,7 @@
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -32,6 +33,7 @@
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 /**
  * This policy tries to pick the executors where the corresponding source or intermediate data for a task reside.
@@ -39,50 +41,48 @@
 @ThreadSafe
 @DriverSide
 @AssociatedProperty(ResourceLocalityProperty.class)
-public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+public final class LocalitySchedulingConstraint implements SchedulingConstraint {
   private final BlockManagerMaster blockManagerMaster;
 
   @Inject
-  private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
+  private LocalitySchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
     this.blockManagerMaster = blockManagerMaster;
   }
 
   /**
-   * Find the location of the intermediate data for a task.
+   * Find the locations of the intermediate data for a task.
    * It is only possible if the task receives only one input edge with One-to-One communication pattern, and
    * the location of the input data is known.
    *
    * @param task the task to schedule.
-   * @return the intermediate data location.
+   * @return the intermediate data locations, empty if none exists.
    */
-  private Optional<String> getIntermediateDataLocation(final Task task) {
+  private List<String> getIntermediateDataLocations(final Task task) {
     if (task.getTaskIncomingEdges().size() == 1) {
       final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
       if (CommunicationPatternProperty.Value.OneToOne.equals(
-          physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
-              .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+        physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+          .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
         final Optional<DuplicateEdgeGroupPropertyValue> dupProp =
-            physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+          physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
         final String representativeEdgeId = dupProp.isPresent()
-            ? dupProp.get().getRepresentativeEdgeId() : physicalStageEdge.getId();
-        final String blockIdToRead =
-            RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
-        final BlockManagerMaster.BlockRequestHandler locationHandler =
-            blockManagerMaster.getBlockLocationHandler(blockIdToRead);
-        if (locationHandler.getLocationFuture().isDone()) { // if the location is known.
-          try {
-            final String location = locationHandler.getLocationFuture().get();
-            return Optional.of(location);
-          } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-          } catch (final ExecutionException e) {
-            throw new RuntimeException(e);
-          }
-        }
+          ? dupProp.get().getRepresentativeEdgeId()
+          : physicalStageEdge.getId();
+
+        final String blockIdToRead = RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
+        return blockManagerMaster.getBlockHandlers(blockIdToRead, BlockState.State.AVAILABLE)
+          .stream()
+          .map(handler -> {
+            try {
+              return handler.getLocationFuture().get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException(e);
+            }
+          })
+          .collect(Collectors.toList());
       }
     }
-    return Optional.empty();
+    return Collections.emptyList();
   }
 
   /**
@@ -90,7 +90,7 @@ private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockMa
    * @return Set of source locations from source tasks in {@code taskDAG}
    * @throws Exception for any exception raised during querying source locations for a readable
    */
-  private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
+  private static Set<String> getSourceDataLocations(final Collection<Readable> readables) throws Exception {
     final List<String> sourceLocations = new ArrayList<>();
     for (final Readable readable : readables) {
       sourceLocations.addAll(readable.getLocations());
@@ -100,10 +100,11 @@ private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockMa
 
   @Override
   public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
-    if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+    if (task.getTaskIncomingEdges().isEmpty()) {
+      // Source task
       final Set<String> sourceLocations;
       try {
-        sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
+        sourceLocations = getSourceDataLocations(task.getIrVertexIdToReadable().values());
       } catch (final UnsupportedOperationException e) {
         return true;
       } catch (final Exception e) {
@@ -115,13 +116,15 @@ public boolean testSchedulability(final ExecutorRepresenter executor, final Task
       }
 
       return sourceLocations.contains(executor.getNodeName());
-    } else { // Non-source task.
-      final Optional<String> optionalIntermediateLoc = getIntermediateDataLocation(task);
-
-      if (getIntermediateDataLocation(task).isPresent()) {
-        return optionalIntermediateLoc.get().equals(executor.getExecutorId());
-      } else {
+    } else {
+      // Non-source task.
+      final List<String> intermediateLocations = getIntermediateDataLocations(task);
+      if (intermediateLocations.isEmpty()) {
+        // Since there is no known location, we just schedule the task to any executor.
         return true;
+      } else {
+        // There is a known location(s), so we schedule to it(them).
+        return intermediateLocations.contains(executor.getExecutorId());
       }
     }
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 12abd56a7..d7f545601 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -80,6 +80,11 @@ void onTaskStateReportFromExecutor(String executorId,
                                      @Nullable String taskPutOnHold,
                                      TaskState.RecoverableTaskFailureCause failureCause);
 
+  /**
+   * Called to check for speculative execution.
+   */
+  void onSpeculativeExecutionCheck();
+
   /**
    * To be called when a job should be terminated.
    * Any clean up code should be implemented in this method.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 50dd8d5d8..6638a3644 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -39,12 +39,12 @@
   private SchedulingConstraintRegistry(
       final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
       final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
-      final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint,
+      final LocalitySchedulingConstraint localitySchedulingConstraint,
       final SkewnessAwareSchedulingConstraint skewnessAwareSchedulingConstraint,
       final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
     registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
     registerSchedulingConstraint(freeSlotSchedulingConstraint);
-    registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+    registerSchedulingConstraint(localitySchedulingConstraint);
     registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
     registerSchedulingConstraint(nodeShareSchedulingConstraint);
   }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 959b3fc65..0b0a8326e 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -27,6 +27,7 @@
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import static org.junit.Assert.assertEquals;
@@ -37,8 +38,8 @@
  * Test for {@link BlockManagerMaster}.
  */
 public final class BlockManagerMasterTest {
-  private static int FIRST_ATTEMPT = 0;
-  private static int SECOND_ATTEPMT = 1;
+  private final static int FIRST_ATTEMPT = 0;
+  private final static int SECOND_ATTEMPT = 1;
   private BlockManagerMaster blockManagerMaster;
 
   @Before
@@ -48,9 +49,9 @@ public void setUp() throws Exception {
     blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
   }
 
-  private static void checkBlockAbsentException(final Future<String> future,
-                                                final String expectedPartitionId,
-                                                final BlockState.State expectedState)
+  private static void checkInProgressToNotAvailableException(final Future<String> future,
+                                                             final String expectedPartitionId,
+                                                             final BlockState.State expectedState)
       throws IllegalStateException, InterruptedException {
     assertTrue(future.isDone());
     try {
@@ -88,24 +89,22 @@ public void testLostAfterCommit() throws Exception {
     final String executorId = RuntimeIdManager.generateExecutorId();
     final String blockId = RuntimeIdManager.generateBlockId(edgeId, taskId);
 
-    // Initially the block state is NOT_AVAILABLE.
-    checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.NOT_AVAILABLE);
+    // Initially the block state does not exist.
+    assertTrue(blockManagerMaster.getBlockHandlers(blockId, BlockState.State.IN_PROGRESS).isEmpty());
 
     // The block is being IN_PROGRESS.
     blockManagerMaster.onProducerTaskScheduled(taskId, Collections.singleton(blockId));
-    final Future<String> future = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
+    final Future<String> future = getSingleLocationFuture(blockId, BlockState.State.IN_PROGRESS);
     checkPendingFuture(future);
 
     // The block is AVAILABLE
     blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
     checkBlockLocation(future, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
-    checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
+    checkBlockLocation(getSingleLocationFuture(blockId, BlockState.State.AVAILABLE), executorId);
 
     // We lost the block.
     blockManagerMaster.removeWorker(executorId);
-    checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.NOT_AVAILABLE);
+    getSingleLocationFuture(blockId, BlockState.State.NOT_AVAILABLE); // this call should succeed with no error.
   }
 
   /**
@@ -125,38 +124,42 @@ public void testBeforeAfterCommit() throws Exception {
 
       // The block is being scheduled.
       blockManagerMaster.onProducerTaskScheduled(firstAttemptTaskId, Collections.singleton(firstAttemptBlockId));
-      final Future<String> future0 = blockManagerMaster.getBlockLocationHandler(firstAttemptBlockId).getLocationFuture();
+      final Future<String> future0 = getSingleLocationFuture(firstAttemptBlockId, BlockState.State.IN_PROGRESS);
       checkPendingFuture(future0);
 
       // Producer task fails.
       blockManagerMaster.onProducerTaskFailed(firstAttemptTaskId);
 
       // A future, previously pending on IN_PROGRESS state, is now completed exceptionally.
-      checkBlockAbsentException(future0, firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
-      checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(firstAttemptBlockId).getLocationFuture(), firstAttemptBlockId,
-          BlockState.State.NOT_AVAILABLE);
+      checkInProgressToNotAvailableException(future0, firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
+      checkInProgressToNotAvailableException(getSingleLocationFuture(firstAttemptBlockId, BlockState.State.NOT_AVAILABLE), firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
     }
 
     // Second attempt
     {
-      final String secondAttemptTaskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, SECOND_ATTEPMT);
+      final String secondAttemptTaskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, SECOND_ATTEMPT);
       final String secondAttemptBlockId = RuntimeIdManager.generateBlockId(edgeId, secondAttemptTaskId);
       final String executorId = RuntimeIdManager.generateExecutorId();
 
       // Re-scheduling the task.
       blockManagerMaster.onProducerTaskScheduled(secondAttemptTaskId, Collections.singleton(secondAttemptBlockId));
-      final Future<String> future1 = blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture();
+      final Future<String> future1 = getSingleLocationFuture(secondAttemptBlockId, BlockState.State.IN_PROGRESS);
       checkPendingFuture(future1);
 
       // Committed.
       blockManagerMaster.onBlockStateChanged(secondAttemptBlockId, BlockState.State.AVAILABLE, executorId);
       checkBlockLocation(future1, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
-      checkBlockLocation(blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture(), executorId);
+      checkBlockLocation(getSingleLocationFuture(secondAttemptBlockId, BlockState.State.AVAILABLE), executorId);
 
       // Then removed.
       blockManagerMaster.onBlockStateChanged(secondAttemptBlockId, BlockState.State.NOT_AVAILABLE, executorId);
-      checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture(), secondAttemptBlockId,
-          BlockState.State.NOT_AVAILABLE);
+      assertEquals(2, blockManagerMaster.getBlockHandlers(secondAttemptBlockId, BlockState.State.NOT_AVAILABLE).size());
     }
   }
+
+  private Future<String> getSingleLocationFuture(final String blockId, final BlockState.State state) {
+    final List<BlockManagerMaster.BlockRequestHandler> handlerList = blockManagerMaster.getBlockHandlers(blockId, state);
+    assertEquals(1, handlerList.size());
+    return handlerList.get(0).getLocationFuture();
+  }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
similarity index 93%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
index ab1bf0bfe..cabc21854 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
@@ -38,11 +38,11 @@
 import static org.mockito.Mockito.*;
 
 /**
- * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
+ * Test cases for {@link LocalitySchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class, BlockManagerMaster.class})
-public final class SourceLocationAwareSchedulingConstraintTest {
+public final class LocalitySchedulingConstraintTest {
   private Injector injector;
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -53,7 +53,7 @@ private static ExecutorRepresenter mockExecutorRepresenter(final String executor
     when(executorRepresenter.getNodeName()).thenReturn(executorId);
     return executorRepresenter;
   }
-  
+
   @Before
   public void setUp() throws Exception {
     injector = Tang.Factory.getTang().newInjector();
@@ -61,13 +61,13 @@ public void setUp() throws Exception {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
+   * {@link LocalitySchedulingConstraint} should fail to schedule a {@link Task} when
    * there are no executors in appropriate location(s).
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() throws InjectionException {
     final SchedulingConstraint schedulingConstraint = injector
-        .getInstance(SourceLocationAwareSchedulingConstraint.class);
+        .getInstance(LocalitySchedulingConstraint.class);
 
     // Prepare test scenario
     final Task task = CreateTask.withReadablesWithSourceLocations(
@@ -81,13 +81,13 @@ public void testSourceLocationAwareSchedulingNotAvailable() throws InjectionExce
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingConstraint} should properly schedule {@link Task}s
+   * {@link LocalitySchedulingConstraint} should properly schedule {@link Task}s
    * with multiple source locations.
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() throws InjectionException {
     final SchedulingConstraint schedulingConstraint = injector
-        .getInstance(SourceLocationAwareSchedulingConstraint.class);
+        .getInstance(LocalitySchedulingConstraint.class);
     // Prepare test scenario
     final Task task0 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index 64abefe04..eabcac4d6 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -46,7 +46,7 @@ public void testSchedulingConstraintRegistry() throws InjectionException {
     assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ResourceSlotProperty.class, registry));
     assertEquals(ContainerTypeAwareSchedulingConstraint.class,
         getConstraintOf(ResourcePriorityProperty.class, registry));
-    assertEquals(SourceLocationAwareSchedulingConstraint.class,
+    assertEquals(LocalitySchedulingConstraint.class,
         getConstraintOf(ResourceLocalityProperty.class, registry));
   }
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index c2bbd12a0..af3fe81d8 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -14,7 +14,6 @@
  * limitations under the License.
  */
 package edu.snu.nemo.runtime.master.scheduler;
-
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
@@ -24,6 +23,7 @@
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -72,6 +72,7 @@
   private Scheduler scheduler;
   private ExecutorRegistry executorRegistry;
   private PlanStateManager planStateManager;
+  private BlockManagerMaster blockManagerMaster;
 
   private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
 
@@ -96,13 +97,21 @@ public void setUp() throws Exception {
   public void testExecutorRemoved() throws Exception {
     // Until the plan finishes, events happen
     while (!planStateManager.isPlanDone()) {
-      // 50% chance remove, 50% chance add, 80% chance task completed
-      executorRemoved(0.5);
-      executorAdded(0.5);
-      taskCompleted(0.8);
+      // 30% chance executor added, 30% chance executor removed
+      executorAdded(0.3);
+      executorRemoved(0.3);
+
+      // random - trigger speculative execution.
+      if (random.nextBoolean()) {
+        Thread.sleep(10);
+      } else {
+        Thread.sleep(20);
+      }
+
+      // 30% chance task completed,
+      taskCompleted(0.3);
 
-      // 10ms sleep
-      Thread.sleep(10);
+      scheduler.onSpeculativeExecutionCheck();
     }
 
     // Plan should COMPLETE
@@ -120,12 +129,17 @@ public void testTaskOutputWriteFailure() throws Exception {
     // Until the plan finishes, events happen
     while (!planStateManager.isPlanDone()) {
       // 50% chance task completed
-      // 50% chance task output write failed
+      // 70% chance task output write failed
       taskCompleted(0.5);
-      taskOutputWriteFailed(0.5);
+      taskOutputWriteFailed(0.7);
 
-      // 10ms sleep
-      Thread.sleep(10);
+      // random - trigger speculative execution.
+      if (random.nextBoolean()) {
+        Thread.sleep(10);
+      } else {
+        Thread.sleep(20);
+      }
+      scheduler.onSpeculativeExecutionCheck();
     }
 
     // Plan should COMPLETE
@@ -178,8 +192,17 @@ private void taskCompleted(final double chance) {
     if (!executingTasks.isEmpty()) {
       final int randomIndex = random.nextInt(executingTasks.size());
       final String selectedTask = executingTasks.get(randomIndex);
-      SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
+
+
+      final Optional<ExecutorRepresenter> executor = executorRegistry.findExecutorForTask(selectedTask);
+      if (executor.isPresent()) {
+        SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
           TaskState.State.COMPLETE, RuntimeIdManager.getAttemptFromTaskId(selectedTask));
+        getOutputBlockIds(selectedTask).forEach(blockId ->
+          blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executor.get().getExecutorId()));
+      } else {
+        throw new RuntimeException(selectedTask);
+      }
     }
   }
 
@@ -221,7 +244,16 @@ private void runPhysicalPlan(final TestPlanGenerator.PlanType planType,
     injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
     planStateManager = injector.getInstance(PlanStateManager.class);
     scheduler = injector.getInstance(Scheduler.class);
+    blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
 
     scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);
   }
+
+  private Set<String> getOutputBlockIds(final String taskId) {
+    return planStateManager.getPhysicalPlan().getStageDAG()
+      .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+      .stream()
+      .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+      .collect(Collectors.toSet()); // ids of blocks this task will produce
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services