You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by sa...@apache.org on 2018/08/22 03:40:37 UTC
[incubator-nemo] branch master updated: [NEMO-179] Delayed Task
Cloning (#112)
This is an automated email from the ASF dual-hosted git repository.
sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 25bea60 [NEMO-179] Delayed Task Cloning (#112)
25bea60 is described below
commit 25bea60cbfcce37d20ccc9d669b0004ef4ef6c12
Author: John Yang <jo...@gmail.com>
AuthorDate: Wed Aug 22 12:40:34 2018 +0900
[NEMO-179] Delayed Task Cloning (#112)
JIRA: [NEMO-179: Delayed Task Cloning](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-179)
**Major changes:**
- Two options for ClonedSchedulingProperty: Speculative execution(fraction, medianTimeMultiplier) + Upfront cloning
- ScheduledExecutorService in RuntimeMaster: Check for speculative execution every 100ms
- Data structures to identify stragglers and clone tasks in PlanStateManager
- BlockState: No longer transitions from NOT_AVAILABLE to other states (similar to TaskState), assuming that a new Block attempt takes over the failed one
**Minor changes to note:**
- Fix a bug in SourceLocationAwareSchedulingConstraint, as its getIntermediateDataLocation is no longer deterministic (randomly fetches one of the clone outputs)
- Log when the speculative execution is triggered
- Indentations
**Tests for the changes:**
- Unit test(TaskRetryTest): Tests combinations of Task retry + Task speculation (cloning), both of which lead to new task attempts
- Integration test(WordCountITCase#testSpeculativeExecution): In addition to the existing test for upfront cloning, this tests conditional (very aggressive) speculative cloning.
**Other comments:**
- N/A
Closes #112
---
.../ClonedSchedulingProperty.java | 88 +++++++-
....java => AggressiveSpeculativeCloningPass.java} | 19 +-
...SchedulingPass.java => UpfrontCloningPass.java} | 24 ++-
.../compiler/optimizer/policy/BasicPullPolicy.java | 3 +
.../compiler/optimizer/policy/BasicPushPolicy.java | 9 +-
.../snu/nemo/examples/beam/WordCountITCase.java | 12 +-
...veSpeculativeCloningPolicyParallelismFive.java} | 11 +-
...=> UpfrontSchedulingPolicyParallelismFive.java} | 10 +-
.../snu/nemo/runtime/common/state/BlockState.java | 10 +-
.../snu/nemo/runtime/common/state/StageState.java | 2 +
.../nemo/runtime/master/BlockManagerMaster.java | 76 ++++---
.../edu/snu/nemo/runtime/master/BlockMetadata.java | 15 +-
.../snu/nemo/runtime/master/PlanStateManager.java | 204 ++++++++++++------
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 18 +-
.../runtime/master/scheduler/BatchScheduler.java | 228 ++++++++++++++-------
...aint.java => LocalitySchedulingConstraint.java} | 71 ++++---
.../nemo/runtime/master/scheduler/Scheduler.java | 5 +
.../scheduler/SchedulingConstraintRegistry.java | 4 +-
.../runtime/master/BlockManagerMasterTest.java | 45 ++--
....java => LocalitySchedulingConstraintTest.java} | 14 +-
.../SchedulingConstraintnRegistryTest.java | 2 +-
.../runtime/master/scheduler/TaskRetryTest.java | 56 +++--
22 files changed, 636 insertions(+), 290 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
index cd0f312..6a24c6d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
@@ -17,30 +17,104 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import java.io.Serializable;
+
/**
* Specifies cloned execution of a vertex.
*
* A major limitations of the current implementation:
* *ALL* of the clones are always scheduled immediately
*/
-public final class ClonedSchedulingProperty extends VertexExecutionProperty<Integer> {
+public final class ClonedSchedulingProperty extends VertexExecutionProperty<ClonedSchedulingProperty.CloneConf> {
/**
* Constructor.
* @param value value of the execution property.
*/
- private ClonedSchedulingProperty(final Integer value) {
+ private ClonedSchedulingProperty(final CloneConf value) {
super(value);
}
/**
* Static method exposing the constructor.
- * @param value value of the new execution property.
+ * @param conf value of the new execution property.
* @return the newly created execution property.
*/
- public static ClonedSchedulingProperty of(final Integer value) {
- if (value <= 0) {
- throw new IllegalStateException(String.valueOf(value));
+ public static ClonedSchedulingProperty of(final CloneConf conf) {
+ return new ClonedSchedulingProperty(conf);
+ }
+
+ /**
+ * Configurations for cloning.
+ * TODO #199: Slot-aware cloning
+ */
+ public static final class CloneConf implements Serializable {
+ // Always clone, upfront.
+ private final boolean upFrontCloning;
+
+ // Fraction of tasks to wait for completion, before trying to clone.
+ // If this value is 0, then we always clone.
+ private final double fractionToWaitFor;
+
+ // How many times slower is a task than the median, in order to be cloned.
+ private final double medianTimeMultiplier;
+
+ /**
+ * Always clone, upfront.
+ */
+ public CloneConf() {
+ this.upFrontCloning = true;
+ this.fractionToWaitFor = 0.0;
+ this.medianTimeMultiplier = 0.0;
+ }
+
+ /**
+ * Clone stragglers judiciously.
+ * @param fractionToWaitFor before trying to clone.
+ * @param medianTimeMultiplier to identify stragglers.
+ */
+ public CloneConf(final double fractionToWaitFor, final double medianTimeMultiplier) {
+ if (fractionToWaitFor >= 1.0 || fractionToWaitFor <= 0) {
+ throw new IllegalArgumentException(String.valueOf(fractionToWaitFor));
+ }
+ if (medianTimeMultiplier < 1.0) {
+ throw new IllegalArgumentException(String.valueOf(medianTimeMultiplier));
+ }
+ this.upFrontCloning = false;
+ this.fractionToWaitFor = fractionToWaitFor;
+ this.medianTimeMultiplier = medianTimeMultiplier;
+ }
+
+ /**
+ * @return fractionToWaitFor.
+ */
+ public double getFractionToWaitFor() {
+ return fractionToWaitFor;
+ }
+
+ /**
+ * @return medianTimeMultiplier.
+ */
+ public double getMedianTimeMultiplier() {
+ return medianTimeMultiplier;
+ }
+
+ /**
+ * @return true if it is upfront cloning.
+ */
+ public boolean isUpFrontCloning() {
+ return upFrontCloning;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("upfront: ");
+ sb.append(upFrontCloning);
+ sb.append(" / fraction: ");
+ sb.append(fractionToWaitFor);
+ sb.append(" / multiplier: ");
+ sb.append(medianTimeMultiplier);
+ return sb.toString();
}
- return new ClonedSchedulingProperty(value);
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
similarity index 61%
copy from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
copy to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
index 32797eb..7897183 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
@@ -21,22 +21,27 @@ import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
/**
- * Set the ClonedScheduling property of source vertices.
+ * Speculative execution. (very aggressive, for unit tests)
+ * TODO #200: Maintain Test Passes and Policies Separately
*/
@Annotates(ClonedSchedulingProperty.class)
-public final class ClonedSchedulingPass extends AnnotatingPass {
+public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
/**
* Default constructor.
*/
- public ClonedSchedulingPass() {
- super(ClonedSchedulingPass.class);
+ public AggressiveSpeculativeCloningPass() {
+ super(AggressiveSpeculativeCloningPass.class);
}
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
- dag.getVertices().stream()
- .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId()).isEmpty())
- .forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(2)));
+ // Speculative execution policy.
+ final double fractionToWaitFor = 0.00000001; // Aggressive
+ final double medianTimeMultiplier = 1.00000001; // Aggressive
+
+ // Apply the policy to ALL vertices
+ dag.getVertices().forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(
+ new ClonedSchedulingProperty.CloneConf(fractionToWaitFor, medianTimeMultiplier))));
return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
similarity index 54%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index 32797eb..7c4920f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -17,26 +17,38 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.Requires;
/**
- * Set the ClonedScheduling property of source vertices.
+ * Set the ClonedScheduling property of source vertices, in an upfront manner.
*/
@Annotates(ClonedSchedulingProperty.class)
-public final class ClonedSchedulingPass extends AnnotatingPass {
+@Requires(CommunicationPatternProperty.class)
+public final class UpfrontCloningPass extends AnnotatingPass {
/**
* Default constructor.
*/
- public ClonedSchedulingPass() {
- super(ClonedSchedulingPass.class);
+ public UpfrontCloningPass() {
+ super(UpfrontCloningPass.class);
}
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
dag.getVertices().stream()
- .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId()).isEmpty())
- .forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(2)));
+ .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
+ .stream()
+ // TODO #198: Handle Un-cloneable Beam Sink Operators
+ // only shuffle receivers (for now... as particular Beam sink operators fail when cloned)
+ .anyMatch(edge ->
+ edge.getPropertyValue(CommunicationPatternProperty.class)
+ .orElseThrow(() -> new IllegalStateException())
+ .equals(CommunicationPatternProperty.Value.Shuffle))
+ )
+ .forEach(vertex -> vertex.setProperty(
+ ClonedSchedulingProperty.of(new ClonedSchedulingProperty.CloneConf()))); // clone upfront, always
return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 192ba26..b4e17d1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -20,14 +20,17 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
import org.apache.reef.tang.Injector;
/**
* Basic pull policy.
+ * TODO #200: Maintain Test Passes and Policies Separately
*/
public final class BasicPullPolicy implements Policy {
public static final PolicyBuilder BUILDER =
new PolicyBuilder()
+ .registerCompileTimePass(new AggressiveSpeculativeCloningPass())
.registerCompileTimePass(new DefaultScheduleGroupPass());
private final Policy policy;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index c0c52ff..8ef91cf 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -21,16 +21,19 @@ import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
import org.apache.reef.tang.Injector;
/**
* Basic push policy.
+ * TODO #200: Maintain Test Passes and Policies Separately
*/
public final class BasicPushPolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder()
- .registerCompileTimePass(new ShuffleEdgePushPass())
- .registerCompileTimePass(new DefaultScheduleGroupPass());
+ new PolicyBuilder()
+ .registerCompileTimePass(new AggressiveSpeculativeCloningPass())
+ .registerCompileTimePass(new ShuffleEdgePushPass())
+ .registerCompileTimePass(new DefaultScheduleGroupPass());
private final Policy policy;
/**
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 1211370..48d598f 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -112,7 +112,17 @@ public final class WordCountITCase {
.addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName() + "_clonedscheduling")
.addMaxTaskAttempt(Integer.MAX_VALUE)
- .addOptimizationPolicy(ClonedSchedulingPolicyParallelismFive.class.getCanonicalName())
+ .addOptimizationPolicy(UpfrontSchedulingPolicyParallelismFive.class.getCanonicalName())
.build());
}
+
+ @Test (timeout = TIMEOUT)
+ public void testSpeculativeExecution() throws Exception {
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WordCountITCase.class.getSimpleName() + "_speculative")
+ .addMaxTaskAttempt(Integer.MAX_VALUE)
+ .addOptimizationPolicy(AggressiveSpeculativeCloningPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
similarity index 85%
copy from examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
copy to examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
index b0e6348..31bd14b 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/AggressiveSpeculativeCloningPolicyParallelismFive.java
@@ -20,21 +20,22 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ClonedSchedulingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AggressiveSpeculativeCloningPass;
import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
import edu.snu.nemo.compiler.optimizer.policy.Policy;
import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
import org.apache.reef.tang.Injector;
+
import java.util.List;
/**
- * A default policy with cloning for tests.
+ * A default policy with (aggressive) speculative execution.
*/
-public final class ClonedSchedulingPolicyParallelismFive implements Policy {
+public final class AggressiveSpeculativeCloningPolicyParallelismFive implements Policy {
private final Policy policy;
- public ClonedSchedulingPolicyParallelismFive() {
+ public AggressiveSpeculativeCloningPolicyParallelismFive() {
final List<CompileTimePass> overwritingPasses = DefaultPolicy.BUILDER.getCompileTimePasses();
- overwritingPasses.add(new ClonedSchedulingPass()); // CLONING!
+ overwritingPasses.add(new AggressiveSpeculativeCloningPass()); // CLONING!
this.policy = new PolicyImpl(
PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
DefaultPolicy.BUILDER.getRuntimePasses());
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
similarity index 87%
rename from examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
rename to examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
index b0e6348..a756037 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/UpfrontSchedulingPolicyParallelismFive.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ClonedSchedulingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.UpfrontCloningPass;
import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
import edu.snu.nemo.compiler.optimizer.policy.Policy;
import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
@@ -28,13 +28,13 @@ import org.apache.reef.tang.Injector;
import java.util.List;
/**
- * A default policy with cloning for tests.
+ * A default policy with upfront cloning.
*/
-public final class ClonedSchedulingPolicyParallelismFive implements Policy {
+public final class UpfrontSchedulingPolicyParallelismFive implements Policy {
private final Policy policy;
- public ClonedSchedulingPolicyParallelismFive() {
+ public UpfrontSchedulingPolicyParallelismFive() {
final List<CompileTimePass> overwritingPasses = DefaultPolicy.BUILDER.getCompileTimePasses();
- overwritingPasses.add(new ClonedSchedulingPass()); // CLONING!
+ overwritingPasses.add(new UpfrontCloningPass()); // CLONING!
this.policy = new PolicyImpl(
PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
DefaultPolicy.BUILDER.getRuntimePasses());
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index b2329ee..b21751c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -31,9 +31,9 @@ public final class BlockState {
final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
// Add states
- stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
stateMachineBuilder.addState(State.IN_PROGRESS, "The block is in the progress of being created.");
stateMachineBuilder.addState(State.AVAILABLE, "The block is available.");
+ stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
// From IN_PROGRESS
stateMachineBuilder.addTransition(State.IN_PROGRESS, State.AVAILABLE, "The block is successfully created");
@@ -41,13 +41,7 @@ public final class BlockState {
"The block is lost before being created");
// From AVAILABLE
- stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is lost");
-
- // From NOT_AVAILABLE
- stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.IN_PROGRESS,
- "The task that produces the block is scheduled.");
- stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.NOT_AVAILABLE,
- "A block can be reported lost from multiple sources");
+ stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is not available");
stateMachineBuilder.setInitialState(State.IN_PROGRESS);
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index e7edbad..1a63f0e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -44,6 +44,8 @@ public final class StageState {
stateMachineBuilder.addTransition(State.INCOMPLETE, State.COMPLETE, "All tasks complete");
stateMachineBuilder.addTransition(State.COMPLETE, State.INCOMPLETE,
"Completed before, but a task in this stage should be retried");
+ stateMachineBuilder.addTransition(State.COMPLETE, State.COMPLETE,
+ "Completed before, but probably a cloned task has completed again");
stateMachineBuilder.setInitialState(State.INCOMPLETE);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index def0888..05b7467 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.*;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -85,6 +86,8 @@ public final class BlockManagerMaster {
/**
* Initializes the states of a block which will be produced by a producer task.
+ * This method is idempotent thanks to the 'Set' data structures.
+ * See BatchScheduler#doSchedule for details on scheduling same task attempts multiple times.
*
* @param blockId the id of the block to initialize.
* @param producerTaskId the id of the producer task.
@@ -134,31 +137,22 @@ public final class BlockManagerMaster {
}
/**
- * Returns a handler of block location requests.
- *
- * @param blockIdOrWildcard id of the specified block.
- * @return the handler of block location requests, which completes exceptionally when the block
- * is not {@code IN_PROGRESS} or {@code AVAILABLE}.
+ * Get handlers of blocks that are in a particular state.
+ * @param blockIdOrWildcard to query
+ * @param state of the block
+ * @return the handlers, empty if none matches.
*/
- public BlockRequestHandler getBlockLocationHandler(final String blockIdOrWildcard) {
+ public List<BlockRequestHandler> getBlockHandlers(final String blockIdOrWildcard,
+ final BlockState.State state) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
final Set<BlockMetadata> metadataSet =
getBlockWildcardStateSet(RuntimeIdManager.getWildCardFromBlockId(blockIdOrWildcard));
- final List<BlockMetadata> candidates = metadataSet.stream()
- .filter(metadata -> metadata.getBlockState().equals(BlockState.State.IN_PROGRESS)
- || metadata.getBlockState().equals(BlockState.State.AVAILABLE))
+ return metadataSet.stream()
+ .filter(metadata -> metadata.getBlockState().equals(state))
+ .map(BlockMetadata::getLocationHandler)
.collect(Collectors.toList());
- if (!candidates.isEmpty()) {
- // Randomly pick one of the candidate handlers.
- return candidates.get(random.nextInt(candidates.size())).getLocationHandler();
- } else {
- // No candidate exists
- final BlockRequestHandler handler = new BlockRequestHandler(blockIdOrWildcard);
- handler.completeExceptionally(new AbsentBlockException(blockIdOrWildcard, BlockState.State.NOT_AVAILABLE));
- return handler;
- }
} finally {
readLock.unlock();
}
@@ -170,7 +164,7 @@ public final class BlockManagerMaster {
* @param blockId the id of the block.
* @return the ids of the producer tasks.
*/
- private Set<String> getProducerTaskIds(final String blockId) {
+ public Set<String> getProducerTaskIds(final String blockId) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
@@ -241,7 +235,9 @@ public final class BlockManagerMaster {
if (location.get().equals(executorId)) {
blockIds.add(blockMetadata.getBlockId());
}
- } catch (final InterruptedException | ExecutionException e) {
+ } catch (final CancellationException | ExecutionException e) {
+ // Don't add (NOT_AVAILABLE)
+ } catch (final InterruptedException e) {
// Cannot reach here because we check the completion of the future already.
LOG.error("Exception while getting the location of a block!", e);
Thread.currentThread().interrupt();
@@ -302,21 +298,37 @@ public final class BlockManagerMaster {
}
/**
- * Deals with a request for the location of a block.
- *
* @param message the request message.
* @param messageContext the message context which will be used for response.
*/
- void onRequestBlockLocation(final ControlMessage.Message message,
- final MessageContext messageContext) {
+ private void registerLocationRequest(final ControlMessage.Message message, final MessageContext messageContext) {
assert (message.getType() == ControlMessage.MessageType.RequestBlockLocation);
final String blockIdWildcard = message.getRequestBlockLocationMsg().getBlockIdWildcard();
final long requestId = message.getId();
final Lock readLock = lock.readLock();
readLock.lock();
try {
- final BlockRequestHandler locationFuture = getBlockLocationHandler(blockIdWildcard);
- locationFuture.registerRequest(requestId, messageContext);
+ // (CASE 1) Check AVAILABLE blocks.
+ final List<BlockRequestHandler> availableBlocks = getBlockHandlers(blockIdWildcard, BlockState.State.AVAILABLE);
+ if (!availableBlocks.isEmpty()) {
+ // random pick
+ // TODO #201: Let Executors Try Multiple Input Block Clones
+ availableBlocks.get(random.nextInt(availableBlocks.size())).registerRequest(requestId, messageContext);
+ return;
+ }
+
+ // (CASE 2) Check IN_PROGRESS blocks.
+ final List<BlockRequestHandler> progressBlocks = getBlockHandlers(blockIdWildcard, BlockState.State.IN_PROGRESS);
+ if (!progressBlocks.isEmpty()) {
+ // random pick
+ progressBlocks.get(random.nextInt(progressBlocks.size())).registerRequest(requestId, messageContext);
+ return;
+ }
+
+ // (CASE 3) Unfortunately, there is no good block to use.
+ final BlockRequestHandler absent = new BlockRequestHandler(blockIdWildcard);
+ absent.completeExceptionally(new AbsentBlockException(blockIdWildcard, BlockState.State.NOT_AVAILABLE));
+ absent.registerRequest(requestId, messageContext);
} finally {
readLock.unlock();
}
@@ -352,7 +364,7 @@ public final class BlockManagerMaster {
public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
switch (message.getType()) {
case RequestBlockLocation:
- onRequestBlockLocation(message, messageContext);
+ registerLocationRequest(message, messageContext);
break;
default:
throw new IllegalMessageException(
@@ -366,16 +378,16 @@ public final class BlockManagerMaster {
* The handler of block location requests.
*/
public static final class BlockRequestHandler {
- private final String blockId;
+ private final String blockIdOrWildcard;
private final CompletableFuture<String> locationFuture;
/**
* Constructor.
*
- * @param blockId the ID of the block.
+ * @param blockIdOrWildcard the ID of the block.
*/
- BlockRequestHandler(final String blockId) {
- this.blockId = blockId;
+ BlockRequestHandler(final String blockIdOrWildcard) {
+ this.blockIdOrWildcard = blockIdOrWildcard;
this.locationFuture = new CompletableFuture<>();
}
@@ -411,7 +423,7 @@ public final class BlockManagerMaster {
final ControlMessage.BlockLocationInfoMsg.Builder infoMsgBuilder =
ControlMessage.BlockLocationInfoMsg.newBuilder()
.setRequestId(requestId)
- .setBlockId(blockId);
+ .setBlockId(blockIdOrWildcard);
locationFuture.whenComplete((location, throwable) -> {
if (throwable == null) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index e1f2af7..a5eb8c3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -64,9 +64,7 @@ final class BlockMetadata {
case IN_PROGRESS:
break;
case NOT_AVAILABLE:
- // Reset the block location and committer information.
locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
- locationHandler = new BlockManagerMaster.BlockRequestHandler(blockId);
break;
case AVAILABLE:
if (location == null) {
@@ -115,4 +113,17 @@ final class BlockMetadata {
sb.append(")");
return sb.toString();
}
+
+ @Override
+ public boolean equals(final Object that) {
+ if (!(that instanceof BlockMetadata)) {
+ return false;
+ }
+ return this.blockId.equals(((BlockMetadata) that).getBlockId());
+ }
+
+ @Override
+ public int hashCode() {
+ return blockId.hashCode();
+ }
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index f798ac1..5017311 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -75,6 +75,13 @@ public final class PlanStateManager {
private final Map<String, List<List<TaskState>>> stageIdToTaskAttemptStates; // sorted by task idx, and then attempt
/**
+ * Used for speculative cloning. (in the unit of milliseconds - ms)
+ */
+ private final Map<String, Long> taskIdToStartTimeMs = new HashMap<>();
+ private final Map<String, List<Long>> stageIdToCompletedTaskTimeMsList = new HashMap<>();
+ private final Map<String, Map<Integer, Integer>> stageIdToTaskIndexToNumOfClones = new HashMap<>();
+
+ /**
* Represents the plan to manage.
*/
private PhysicalPlan physicalPlan;
@@ -122,6 +129,8 @@ public final class PlanStateManager {
if (!initialized) {
// First scheduling.
this.initialized = true;
+ } else {
+ LOG.info("Update Plan from {} to {}", physicalPlan.getPlanId(), physicalPlanToUpdate.getPlanId());
}
this.planState = new PlanState();
this.metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlanToUpdate.getStageDAG());
@@ -129,22 +138,22 @@ public final class PlanStateManager {
this.physicalPlan = physicalPlanToUpdate;
this.planId = physicalPlanToUpdate.getPlanId();
this.maxScheduleAttempt = maxScheduleAttemptToSet;
- initializeComputationStates();
+ initializeStates();
}
/**
* Initializes the states for the plan/stages/tasks for this plan.
* TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
*/
- private void initializeComputationStates() {
+ private void initializeStates() {
onPlanStateChanged(PlanState.State.EXECUTING);
physicalPlan.getStageDAG().topologicalDo(stage -> {
if (!stageIdToState.containsKey(stage.getId())) {
stageIdToState.put(stage.getId(), new StageState());
stageIdToTaskAttemptStates.put(stage.getId(), new ArrayList<>(stage.getParallelism()));
+
// for each task idx of this stage
for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
- // for each task idx of this stage
stageIdToTaskAttemptStates.get(stage.getId()).add(new ArrayList<>());
// task states will be initialized lazily in getTaskAttemptsToSchedule()
}
@@ -152,6 +161,9 @@ public final class PlanStateManager {
});
}
+ /////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////////// Core scheduling methods
+
/**
* Get task attempts that are "READY".
*
@@ -169,7 +181,7 @@ public final class PlanStateManager {
final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
final List<TaskState> attemptStatesForThisTaskIndex =
- stageIdToTaskAttemptStates.get(stage.getId()).get(taskIndex);
+ stageIdToTaskAttemptStates.get(stageId).get(taskIndex);
// If one of the attempts is COMPLETE, do not schedule
if (attemptStatesForThisTaskIndex
@@ -177,10 +189,17 @@ public final class PlanStateManager {
.noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
// (Step 1) Create new READY attempts, as many as
- // # of clones - # of 'not-done' attempts)
- final int numOfClones = stage.getPropertyValue(ClonedSchedulingProperty.class).orElse(1);
+ // # of numOfConcurrentAttempts(including clones) - # of 'not-done' attempts
+ stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
+ final Optional<ClonedSchedulingProperty.CloneConf> cloneConf =
+ stage.getPropertyValue(ClonedSchedulingProperty.class);
+ final int numOfConcurrentAttempts = cloneConf.isPresent() && cloneConf.get().isUpFrontCloning()
+ // For now we support up to 1 clone (2 concurrent = 1 original + 1 clone)
+ ? 2
+ // If the property is not set, then we do not clone (= 1 concurrent)
+ : stageIdToTaskIndexToNumOfClones.get(stageId).getOrDefault(stageId, 1);
final long numOfNotDoneAttempts = attemptStatesForThisTaskIndex.stream().filter(this::isTaskNotDone).count();
- for (int i = 0; i < numOfClones - numOfNotDoneAttempts; i++) {
+ for (int i = 0; i < numOfConcurrentAttempts - numOfNotDoneAttempts; i++) {
attemptStatesForThisTaskIndex.add(new TaskState());
}
@@ -204,23 +223,66 @@ public final class PlanStateManager {
return taskAttemptsToSchedule;
}
- private boolean isTaskNotDone(final TaskState taskState) {
- final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
- return state.equals(TaskState.State.READY)
- || state.equals(TaskState.State.EXECUTING)
- || state.equals(TaskState.State.ON_HOLD);
- }
-
/**
- * Gets the attempt numbers of all tasks in a stage.
- *
- * @param stageId the stage to investigate.
- * @return the attempt numbers of all tasks in a stage.
+ * @param stageId to query.
+ * @return all task attempt ids of the stage.
*/
public synchronized Set<String> getAllTaskAttemptsOfStage(final String stageId) {
return getTaskAttemptIdsToItsState(stageId).keySet();
}
+ /////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////////// Speculative execution
+
+ /**
+ * @param stageId to query.
+ * @return a map from an EXECUTING task to its running time so far.
+ */
+ public synchronized Map<String, Long> getExecutingTaskToRunningTimeMs(final String stageId) {
+ final long curTime = System.currentTimeMillis();
+ final Map<String, Long> result = new HashMap<>();
+
+ final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
+ for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
+ final List<TaskState> attemptStates = taskStates.get(taskIndex);
+ for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
+ if (TaskState.State.EXECUTING.equals(attemptStates.get(attempt).getStateMachine().getCurrentState())) {
+ final String taskId = RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt);
+ result.put(taskId, curTime - taskIdToStartTimeMs.get(taskId));
+ }
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * List of task times so far for this stage.
+ * @param stageId of the stage.
+ * @return a copy of the list, empty if none completed.
+ */
+ public synchronized List<Long> getCompletedTaskTimeListMs(final String stageId) {
+ // Return a copy
+ return new ArrayList<>(stageIdToCompletedTaskTimeMsList.getOrDefault(stageId, new ArrayList<>(0)));
+ }
+
+ /**
+ * @param stageId of the clone.
+ * @param taskIndex of the clone.
+ * @param numOfClones of the clone.
+ * @return true if the numOfClones has been modified, false otherwise
+ */
+ public synchronized boolean setNumOfClones(final String stageId, final int taskIndex, final int numOfClones) {
+ stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
+ // overwrite the previous value.
+ final Integer previousNumOfClones = stageIdToTaskIndexToNumOfClones.get(stageId).put(taskIndex, numOfClones);
+ return (previousNumOfClones == null) || (previousNumOfClones != numOfClones);
+ }
+
+
+ /////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////////// State transitions
+
/**
* Updates the state of a task.
* Task state changes can occur both in master and executor.
@@ -252,18 +314,28 @@ public final class PlanStateManager {
final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
final List<List<TaskState>> taskStatesOfThisStage = stageIdToTaskAttemptStates.get(stageId);
final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.stream()
- .map(attempts -> attempts.stream()
- .map(state -> state.getStateMachine().getCurrentState())
- .allMatch(curState -> curState.equals(TaskState.State.COMPLETE)
- || curState.equals(TaskState.State.SHOULD_RETRY)
- || curState.equals(TaskState.State.ON_HOLD)))
- .filter(bool -> bool.equals(true))
+ .filter(attempts -> {
+ final List<TaskState.State> states = attempts
+ .stream()
+ .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+ .collect(Collectors.toList());
+ return states.stream().anyMatch(curState -> curState.equals(TaskState.State.ON_HOLD)) // one of them is ON_HOLD
+ || states.stream().anyMatch(curState -> curState.equals(TaskState.State.COMPLETE)); // one of them is COMPLETE
+ })
.count();
if (newTaskState.equals(TaskState.State.COMPLETE)) {
LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
}
+ // Maintain info for speculative execution
+ if (newTaskState.equals(TaskState.State.EXECUTING)) {
+ taskIdToStartTimeMs.put(taskId, System.currentTimeMillis());
+ } else if (newTaskState.equals(TaskState.State.COMPLETE)) {
+ stageIdToCompletedTaskTimeMsList.putIfAbsent(stageId, new ArrayList<>());
+ stageIdToCompletedTaskTimeMsList.get(stageId).add(System.currentTimeMillis() - taskIdToStartTimeMs.get(taskId));
+ }
+
// Change stage state, if needed
switch (newTaskState) {
// INCOMPLETE stage
@@ -295,20 +367,6 @@ public final class PlanStateManager {
}
}
- private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) {
- final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
- final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
- final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
-
- final List<TaskState> otherAttemptsforTheSameTaskIndex =
- new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
- otherAttemptsforTheSameTaskIndex.remove(attempt);
-
- return otherAttemptsforTheSameTaskIndex.stream()
- .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
- .collect(Collectors.toList());
- }
-
/**
* (PRIVATE METHOD)
* Updates the state of a stage.
@@ -376,6 +434,8 @@ public final class PlanStateManager {
}
}
+ /////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////////// Helper Methods
/**
* Wait for this plan to be finished and return the final state.
@@ -423,6 +483,18 @@ public final class PlanStateManager {
}
/**
+ * @return a map from task attempt id to its current state.
+ */
+ @VisibleForTesting
+ public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
+ return physicalPlan.getStageDAG().getVertices()
+ .stream()
+ .map(Stage::getId)
+ .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
* @return whether the execution for the plan is done or not.
*/
public synchronized boolean isPlanDone() {
@@ -459,6 +531,19 @@ public final class PlanStateManager {
return (TaskState.State) getTaskStateHelper(taskId).getStateMachine().getCurrentState();
}
+ private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
+ final Map<String, TaskState.State> result = new HashMap<>();
+ final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
+ for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
+ final List<TaskState> attemptStates = taskStates.get(taskIndex);
+ for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
+ result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
+ (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
+ }
+ }
+ return result;
+ }
+
private TaskState getTaskStateHelper(final String taskId) {
return stageIdToTaskAttemptStates
.get(RuntimeIdManager.getStageIdFromTaskId(taskId))
@@ -466,6 +551,27 @@ public final class PlanStateManager {
.get(RuntimeIdManager.getAttemptFromTaskId(taskId));
}
+ private boolean isTaskNotDone(final TaskState taskState) {
+ final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
+ return state.equals(TaskState.State.READY)
+ || state.equals(TaskState.State.EXECUTING)
+ || state.equals(TaskState.State.ON_HOLD);
+ }
+
+ private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) {
+ final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
+ final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
+ final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
+
+ final List<TaskState> otherAttemptsforTheSameTaskIndex =
+ new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
+ otherAttemptsforTheSameTaskIndex.remove(attempt);
+
+ return otherAttemptsforTheSameTaskIndex.stream()
+ .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+ .collect(Collectors.toList());
+ }
+
/**
* @return the physical plan.
*/
@@ -548,26 +654,4 @@ public final class PlanStateManager {
sb.append("]}");
return sb.toString();
}
-
- @VisibleForTesting
- public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
- return physicalPlan.getStageDAG().getVertices()
- .stream()
- .map(Stage::getId)
- .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-
- private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
- final Map<String, TaskState.State> result = new HashMap<>();
- final List<List<TaskState>> taskStates = stageIdToTaskAttemptStates.get(stageId);
- for (int taskIndex = 0; taskIndex < taskStates.size(); taskIndex++) {
- final List<TaskState> attemptStates = taskStates.get(taskIndex);
- for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
- result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
- (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
- }
- }
- return result;
- }
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 4285469..00eac87 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -72,8 +72,11 @@ public final class RuntimeMaster {
private static final int DAG_LOGGING_PERIOD = 3000;
private static final int METRIC_ARRIVE_TIMEOUT = 10000;
private static final int REST_SERVER_PORT = 10101;
+ private static final int SPECULATION_CHECKING_PERIOD_MS = 100;
private final ExecutorService runtimeMasterThread;
+ private final ScheduledExecutorService speculativeTaskCloningThread;
+
private final Scheduler scheduler;
private final ContainerManager containerManager;
private final MetricMessageHandler metricMessageHandler;
@@ -103,6 +106,16 @@ public final class RuntimeMaster {
// and keeping it single threaded removes the complexity of multi-thread synchronization.
this.runtimeMasterThread =
Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
+
+ // Check for speculative execution every second.
+ this.speculativeTaskCloningThread = Executors
+ .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "SpeculativeTaskCloning thread"));
+ this.speculativeTaskCloningThread.scheduleAtFixedRate(
+ () -> this.runtimeMasterThread.submit(scheduler::onSpeculativeExecutionCheck),
+ SPECULATION_CHECKING_PERIOD_MS,
+ SPECULATION_CHECKING_PERIOD_MS,
+ TimeUnit.MILLISECONDS);
+
this.scheduler = scheduler;
this.containerManager = containerManager;
this.metricMessageHandler = metricMessageHandler;
@@ -169,6 +182,9 @@ public final class RuntimeMaster {
* Terminates the RuntimeMaster.
*/
public void terminate() {
+ // No need to speculate anymore
+ speculativeTaskCloningThread.shutdown();
+
// send metric flush request to all executors
metricManagerMaster.sendMetricFlushRequest();
try {
@@ -181,6 +197,7 @@ public final class RuntimeMaster {
// clean up state...
Thread.currentThread().interrupt();
}
+
runtimeMasterThread.execute(() -> {
scheduler.terminate();
try {
@@ -406,7 +423,6 @@ public final class RuntimeMaster {
private ScheduledExecutorService scheduleDagLogging() {
final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor();
dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
-
public void run() {
planStateManager.storeJSON("periodic");
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index ac2db86..5ccc2fc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -17,13 +17,16 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.collect.Sets;
import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
import edu.snu.nemo.runtime.common.RuntimeIdManager;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
import edu.snu.nemo.runtime.common.plan.*;
+import edu.snu.nemo.runtime.common.state.BlockState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.PlanAppender;
import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
@@ -34,6 +37,7 @@ import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.LoggerFactory;
@@ -41,7 +45,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.util.*;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -92,7 +96,7 @@ public final class BatchScheduler implements Scheduler {
updatePhysicalPlanEventHandler.setScheduler(this);
if (pubSubEventHandlerWrapper.getPubSubEventHandler() != null) {
pubSubEventHandlerWrapper.getPubSubEventHandler()
- .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
+ .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
}
this.executorRegistry = executorRegistry;
this.planStateManager = planStateManager;
@@ -121,7 +125,7 @@ public final class BatchScheduler implements Scheduler {
} else {
// Append the submitted plan to the original plan.
final PhysicalPlan appendedPlan =
- PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
+ PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
updatePlan(appendedPlan, maxScheduleAttempt);
planStateManager.storeJSON("appended");
}
@@ -147,11 +151,11 @@ public final class BatchScheduler implements Scheduler {
final int maxScheduleAttempt) {
planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
- .collect(Collectors.groupingBy(Stage::getScheduleGroup))
- .entrySet().stream()
- .sorted(Map.Entry.comparingByKey())
- .map(Map.Entry::getValue)
- .collect(Collectors.toList());
+ .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+ .entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
}
/**
@@ -186,7 +190,7 @@ public final class BatchScheduler implements Scheduler {
break;
case FAILED:
throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
- .append(taskId).append(" in Executor ").append(executorId).toString()));
+ .append(taskId).append(" in Executor ").append(executorId).toString()));
case READY:
case EXECUTING:
throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
@@ -228,6 +232,56 @@ public final class BatchScheduler implements Scheduler {
}
@Override
+ public void onSpeculativeExecutionCheck() {
+ MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
+
+ selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
+ scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
+ final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
+
+ // Only if the ClonedSchedulingProperty is set...
+ stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
+ if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
+ final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
+ final int parallelism = stage.getParallelism();
+ final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();
+
+ // Only after the fraction of the tasks are done...
+ // Delayed cloning (aggressive)
+ if (completedTaskTimes.length > 0
+ && completedTaskTimes.length >= Math.round(parallelism * fractionToWaitFor)) {
+ Arrays.sort(completedTaskTimes);
+ final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
+ final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
+ final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
+ for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {
+
+ // Only if the running task is considered a 'straggler'....
+ final long runningTime = entry.getValue();
+ if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
+ final String taskId = entry.getKey();
+ final boolean isCloned = planStateManager.setNumOfClones(
+ stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
+ if (isCloned) {
+ LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+ + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
+ medianTime, medianTimeMultiplier);
+ }
+ isNumOfCloneChanged.setValue(isCloned);
+ }
+ }
+ }
+ }
+ });
+ });
+ });
+
+ if (isNumOfCloneChanged.booleanValue()) {
+ doSchedule(); // Do schedule the new clone.
+ }
+ }
+
+ @Override
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
executorRegistry.registerExecutor(executorRepresenter);
@@ -246,6 +300,9 @@ public final class BatchScheduler implements Scheduler {
return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
});
+ // Blocks of the interrupted tasks are failed.
+ interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
+
// Retry the interrupted tasks (and required parents)
retryTasksAndRequiredParents(interruptedTasks);
@@ -273,25 +330,21 @@ public final class BatchScheduler implements Scheduler {
final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
if (earliest.isPresent()) {
- // Get schedulable tasks.
final List<Task> tasksToSchedule = earliest.get().stream()
- .flatMap(stage -> selectSchedulableTasks(stage).stream())
- .collect(Collectors.toList());
-
- // We prefer (but not guarantee) to schedule the 'receiving' tasks first,
- // assuming that tasks within a ScheduleGroup are connected with 'push' edges.
- Collections.reverse(tasksToSchedule);
-
- LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+ .flatMap(stage -> selectSchedulableTasks(stage).stream())
+ .collect(Collectors.toList());
+ if (!tasksToSchedule.isEmpty()) {
+ LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
.map(Task::getTaskId)
.map(RuntimeIdManager::getStageIdFromTaskId)
.collect(Collectors.toSet()));
- // Set the pointer to the schedulable tasks.
- pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+ // Set the pointer to the schedulable tasks.
+ pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
- // Notify the dispatcher that a new collection is available.
- taskDispatcher.onNewPendingTaskCollectionAvailable();
+ // Notify the dispatcher that a new collection is available.
+ taskDispatcher.onNewPendingTaskCollectionAvailable();
+ }
} else {
LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
}
@@ -303,11 +356,11 @@ public final class BatchScheduler implements Scheduler {
}
return sortedScheduleGroups.stream()
- .filter(scheduleGroup -> scheduleGroup.stream()
- .map(Stage::getId)
- .map(planStateManager::getStageState)
- .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
- .findFirst(); // selects the one with the smallest scheduling group index.
+ .filter(scheduleGroup -> scheduleGroup.stream()
+ .map(Stage::getId)
+ .map(planStateManager::getStageState)
+ .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
+ .findFirst(); // selects the one with the smallest scheduling group index.
}
private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
@@ -322,9 +375,9 @@ public final class BatchScheduler implements Scheduler {
}
final List<StageEdge> stageIncomingEdges =
- planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+ planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
final List<StageEdge> stageOutgoingEdges =
- planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+ planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
// Create and return tasks.
final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
@@ -332,21 +385,17 @@ public final class BatchScheduler implements Scheduler {
final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
taskIdsToSchedule.forEach(taskId -> {
- final Set<String> blockIds = planStateManager.getPhysicalPlan().getStageDAG()
- .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
- .stream()
- .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
- .collect(Collectors.toSet()); // ids of blocks this task will produce
+ final Set<String> blockIds = getOutputBlockIds(taskId);
blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
tasks.add(new Task(
- planStateManager.getPhysicalPlan().getPlanId(),
- taskId,
- stageToSchedule.getExecutionProperties(),
- stageToSchedule.getSerializedIRDAG(),
- stageIncomingEdges,
- stageOutgoingEdges,
- vertexIdToReadables.get(taskIdx)));
+ planStateManager.getPhysicalPlan().getPlanId(),
+ taskId,
+ stageToSchedule.getExecutionProperties(),
+ stageToSchedule.getSerializedIRDAG(),
+ stageIncomingEdges,
+ stageOutgoingEdges,
+ vertexIdToReadables.get(taskIdx)));
});
return tasks;
}
@@ -407,7 +456,7 @@ public final class BatchScheduler implements Scheduler {
final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
final boolean stageComplete =
- planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+ planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
final StageEdge targetEdge = getEdgeToOptimize(taskId);
if (targetEdge == null) {
@@ -416,11 +465,11 @@ public final class BatchScheduler implements Scheduler {
if (stageComplete) {
final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
- .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
- .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+ .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+ .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
pubSubEventHandlerWrapper.getPubSubEventHandler()
- .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
- taskId, executorId, targetEdge));
+ .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
+ taskId, executorId, targetEdge));
}
}
@@ -461,50 +510,77 @@ public final class BatchScheduler implements Scheduler {
final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
LOG.info("Will be retried: {}", tasksToRetry);
tasksToRetry.forEach(
- taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+ taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
}
private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
if (children.isEmpty()) {
return Collections.emptySet();
}
+ final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
+
+ final Map<String, StageEdge> idToIncomingEdges = children.stream()
+ .map(RuntimeIdManager::getStageIdFromTaskId)
+ .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
+ // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
+ .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
final Set<String> parentsWithLostBlocks = children.stream()
- .flatMap(child -> getParentTasks(child).stream())
- .filter(parent -> {
- final CompletableFuture<String> locationFuture =
- blockManagerMaster.getBlockLocationHandler(parent).getLocationFuture();
- return locationFuture.isCompletedExceptionally() || locationFuture.isCancelled();
- })
- .collect(Collectors.toSet());
+ .flatMap(child -> getInputBlockIds(child).stream()) // child task id -> parent block ids
+ .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
+ .collect(Collectors.toSet()).stream() // remove duplicate wildcards
+ .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
+ blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
+ .flatMap(lostParentBlockWildcard -> {
+ // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
+ final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
+ final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
+ final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
+ return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
+ .stream()
+ .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
+ && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
+ // COMPLETE -> SHOULD_RETRY
+ .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
+ })
+ .collect(Collectors.toSet());
+
// Recursive call
return Sets.union(parentsWithLostBlocks, recursivelyGetParentTasksForLostBlocks(parentsWithLostBlocks));
}
- private Set<String> getParentTasks(final String childTaskId) {
+ private Set<String> getOutputBlockIds(final String taskId) {
+ return planStateManager.getPhysicalPlan().getStageDAG()
+ .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+ .stream()
+ .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+ .collect(Collectors.toSet()); // ids of blocks this task will produce
+ }
+ private Set<String> getInputBlockIds(final String childTaskId) {
final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
- .stream()
- .flatMap(inStageEdge -> {
- final String parentStageId = inStageEdge.getSrc().getId();
- final Set<String> tasksOfParentStage = planStateManager.getAllTaskAttemptsOfStage(parentStageId);
-
- switch (inStageEdge.getDataCommunicationPattern()) {
- case Shuffle:
- case BroadCast:
- // All of the parent stage's tasks
- return tasksOfParentStage.stream();
- case OneToOne:
- // Same-index tasks of the parent stage
- return tasksOfParentStage.stream().filter(task ->
- RuntimeIdManager.getIndexFromTaskId(task) == RuntimeIdManager.getIndexFromTaskId(childTaskId));
- default:
- throw new IllegalStateException(inStageEdge.toString());
- }
- })
- .collect(Collectors.toSet());
+ .stream()
+ .flatMap(inStageEdge -> {
+ final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
+ switch (inStageEdge.getDataCommunicationPattern()) {
+ case Shuffle:
+ case BroadCast:
+ // All of the parent stage's tasks
+ return parentTaskIds.stream()
+ .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+ case OneToOne:
+ // Same-index tasks of the parent stage
+ return parentTaskIds.stream()
+ .filter(parentTaskId ->
+ RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
+ .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+ default:
+ throw new IllegalStateException(inStageEdge.toString());
+ }
+ })
+ .collect(Collectors.toSet());
}
/**
@@ -514,8 +590,8 @@ public final class BatchScheduler implements Scheduler {
*/
public void updateDynOptData(final Object dynOptData) {
final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
- .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
- .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+
+ .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
dynOptDataHandler.updateDynOptData(dynOptData);
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
similarity index 63%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
index 4e37fde..769719c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
@@ -24,6 +24,7 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
import edu.snu.nemo.runtime.common.RuntimeIdManager;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.common.state.BlockState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
@@ -32,6 +33,7 @@ import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
/**
* This policy tries to pick the executors where the corresponding source or intermediate data for a task reside.
@@ -39,50 +41,48 @@ import java.util.concurrent.ExecutionException;
@ThreadSafe
@DriverSide
@AssociatedProperty(ResourceLocalityProperty.class)
-public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+public final class LocalitySchedulingConstraint implements SchedulingConstraint {
private final BlockManagerMaster blockManagerMaster;
@Inject
- private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
+ private LocalitySchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
this.blockManagerMaster = blockManagerMaster;
}
/**
- * Find the location of the intermediate data for a task.
+ * Find the locations of the intermediate data for a task.
* It is only possible if the task receives only one input edge with One-to-One communication pattern, and
* the location of the input data is known.
*
* @param task the task to schedule.
- * @return the intermediate data location.
+ * @return the intermediate data locations, empty if none exists.
*/
- private Optional<String> getIntermediateDataLocation(final Task task) {
+ private List<String> getIntermediateDataLocations(final Task task) {
if (task.getTaskIncomingEdges().size() == 1) {
final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
if (CommunicationPatternProperty.Value.OneToOne.equals(
- physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
- .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+ physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+ .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
final Optional<DuplicateEdgeGroupPropertyValue> dupProp =
- physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+ physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
final String representativeEdgeId = dupProp.isPresent()
- ? dupProp.get().getRepresentativeEdgeId() : physicalStageEdge.getId();
- final String blockIdToRead =
- RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
- final BlockManagerMaster.BlockRequestHandler locationHandler =
- blockManagerMaster.getBlockLocationHandler(blockIdToRead);
- if (locationHandler.getLocationFuture().isDone()) { // if the location is known.
- try {
- final String location = locationHandler.getLocationFuture().get();
- return Optional.of(location);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (final ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
+ ? dupProp.get().getRepresentativeEdgeId()
+ : physicalStageEdge.getId();
+
+ final String blockIdToRead = RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
+ return blockManagerMaster.getBlockHandlers(blockIdToRead, BlockState.State.AVAILABLE)
+ .stream()
+ .map(handler -> {
+ try {
+ return handler.getLocationFuture().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
}
}
- return Optional.empty();
+ return Collections.emptyList();
}
/**
@@ -90,7 +90,7 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
* @return Set of source locations from source tasks in {@code taskDAG}
* @throws Exception for any exception raised during querying source locations for a readable
*/
- private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
+ private static Set<String> getSourceDataLocations(final Collection<Readable> readables) throws Exception {
final List<String> sourceLocations = new ArrayList<>();
for (final Readable readable : readables) {
sourceLocations.addAll(readable.getLocations());
@@ -100,10 +100,11 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
@Override
public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
- if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+ if (task.getTaskIncomingEdges().isEmpty()) {
+ // Source task
final Set<String> sourceLocations;
try {
- sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
+ sourceLocations = getSourceDataLocations(task.getIrVertexIdToReadable().values());
} catch (final UnsupportedOperationException e) {
return true;
} catch (final Exception e) {
@@ -115,13 +116,15 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
}
return sourceLocations.contains(executor.getNodeName());
- } else { // Non-source task.
- final Optional<String> optionalIntermediateLoc = getIntermediateDataLocation(task);
-
- if (getIntermediateDataLocation(task).isPresent()) {
- return optionalIntermediateLoc.get().equals(executor.getExecutorId());
- } else {
+ } else {
+ // Non-source task.
+ final List<String> intermediateLocations = getIntermediateDataLocations(task);
+ if (intermediateLocations.isEmpty()) {
+ // Since there is no known location, we just schedule the task to any executor.
return true;
+ } else {
+ // There is a known location(s), so we schedule to it(them).
+ return intermediateLocations.contains(executor.getExecutorId());
}
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 12abd56..d7f5456 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -81,6 +81,11 @@ public interface Scheduler {
TaskState.RecoverableTaskFailureCause failureCause);
/**
+ * Called to check for speculative execution.
+ */
+ void onSpeculativeExecutionCheck();
+
+ /**
* To be called when a job should be terminated.
* Any clean up code should be implemented in this method.
*/
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 50dd8d5..6638a36 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -39,12 +39,12 @@ public final class SchedulingConstraintRegistry {
private SchedulingConstraintRegistry(
final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
- final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint,
+ final LocalitySchedulingConstraint localitySchedulingConstraint,
final SkewnessAwareSchedulingConstraint skewnessAwareSchedulingConstraint,
final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
registerSchedulingConstraint(freeSlotSchedulingConstraint);
- registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+ registerSchedulingConstraint(localitySchedulingConstraint);
registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
registerSchedulingConstraint(nodeShareSchedulingConstraint);
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 959b3fc..0b0a832 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
@@ -37,8 +38,8 @@ import static org.junit.Assert.assertTrue;
* Test for {@link BlockManagerMaster}.
*/
public final class BlockManagerMasterTest {
- private static int FIRST_ATTEMPT = 0;
- private static int SECOND_ATTEPMT = 1;
+ private final static int FIRST_ATTEMPT = 0;
+ private final static int SECOND_ATTEMPT = 1;
private BlockManagerMaster blockManagerMaster;
@Before
@@ -48,9 +49,9 @@ public final class BlockManagerMasterTest {
blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
}
- private static void checkBlockAbsentException(final Future<String> future,
- final String expectedPartitionId,
- final BlockState.State expectedState)
+ private static void checkInProgressToNotAvailableException(final Future<String> future,
+ final String expectedPartitionId,
+ final BlockState.State expectedState)
throws IllegalStateException, InterruptedException {
assertTrue(future.isDone());
try {
@@ -88,24 +89,22 @@ public final class BlockManagerMasterTest {
final String executorId = RuntimeIdManager.generateExecutorId();
final String blockId = RuntimeIdManager.generateBlockId(edgeId, taskId);
- // Initially the block state is NOT_AVAILABLE.
- checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
- BlockState.State.NOT_AVAILABLE);
+ // Initially the block state does not exist.
+ assertTrue(blockManagerMaster.getBlockHandlers(blockId, BlockState.State.IN_PROGRESS).isEmpty());
// The block is being IN_PROGRESS.
blockManagerMaster.onProducerTaskScheduled(taskId, Collections.singleton(blockId));
- final Future<String> future = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
+ final Future<String> future = getSingleLocationFuture(blockId, BlockState.State.IN_PROGRESS);
checkPendingFuture(future);
// The block is AVAILABLE
blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
checkBlockLocation(future, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
- checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
+ checkBlockLocation(getSingleLocationFuture(blockId, BlockState.State.AVAILABLE), executorId);
// We lost the block.
blockManagerMaster.removeWorker(executorId);
- checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
- BlockState.State.NOT_AVAILABLE);
+ getSingleLocationFuture(blockId, BlockState.State.NOT_AVAILABLE); // this call should succeed with no error.
}
/**
@@ -125,38 +124,42 @@ public final class BlockManagerMasterTest {
// The block is being scheduled.
blockManagerMaster.onProducerTaskScheduled(firstAttemptTaskId, Collections.singleton(firstAttemptBlockId));
- final Future<String> future0 = blockManagerMaster.getBlockLocationHandler(firstAttemptBlockId).getLocationFuture();
+ final Future<String> future0 = getSingleLocationFuture(firstAttemptBlockId, BlockState.State.IN_PROGRESS);
checkPendingFuture(future0);
// Producer task fails.
blockManagerMaster.onProducerTaskFailed(firstAttemptTaskId);
// A future, previously pending on IN_PROGRESS state, is now completed exceptionally.
- checkBlockAbsentException(future0, firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
- checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(firstAttemptBlockId).getLocationFuture(), firstAttemptBlockId,
- BlockState.State.NOT_AVAILABLE);
+ checkInProgressToNotAvailableException(future0, firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
+ checkInProgressToNotAvailableException(getSingleLocationFuture(firstAttemptBlockId, BlockState.State.NOT_AVAILABLE), firstAttemptBlockId, BlockState.State.NOT_AVAILABLE);
}
// Second attempt
{
- final String secondAttemptTaskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, SECOND_ATTEPMT);
+ final String secondAttemptTaskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, SECOND_ATTEMPT);
final String secondAttemptBlockId = RuntimeIdManager.generateBlockId(edgeId, secondAttemptTaskId);
final String executorId = RuntimeIdManager.generateExecutorId();
// Re-scheduling the task.
blockManagerMaster.onProducerTaskScheduled(secondAttemptTaskId, Collections.singleton(secondAttemptBlockId));
- final Future<String> future1 = blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture();
+ final Future<String> future1 = getSingleLocationFuture(secondAttemptBlockId, BlockState.State.IN_PROGRESS);
checkPendingFuture(future1);
// Committed.
blockManagerMaster.onBlockStateChanged(secondAttemptBlockId, BlockState.State.AVAILABLE, executorId);
checkBlockLocation(future1, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
- checkBlockLocation(blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture(), executorId);
+ checkBlockLocation(getSingleLocationFuture(secondAttemptBlockId, BlockState.State.AVAILABLE), executorId);
// Then removed.
blockManagerMaster.onBlockStateChanged(secondAttemptBlockId, BlockState.State.NOT_AVAILABLE, executorId);
- checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(secondAttemptBlockId).getLocationFuture(), secondAttemptBlockId,
- BlockState.State.NOT_AVAILABLE);
+ assertEquals(2, blockManagerMaster.getBlockHandlers(secondAttemptBlockId, BlockState.State.NOT_AVAILABLE).size());
}
}
+
+ private Future<String> getSingleLocationFuture(final String blockId, final BlockState.State state) {
+ final List<BlockManagerMaster.BlockRequestHandler> handlerList = blockManagerMaster.getBlockHandlers(blockId, state);
+ assertEquals(1, handlerList.size());
+ return handlerList.get(0).getLocationFuture();
+ }
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
similarity index 93%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
index ab1bf0b..cabc218 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/LocalitySchedulingConstraintTest.java
@@ -38,11 +38,11 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
/**
- * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
+ * Test cases for {@link LocalitySchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class, BlockManagerMaster.class})
-public final class SourceLocationAwareSchedulingConstraintTest {
+public final class LocalitySchedulingConstraintTest {
private Injector injector;
private static final String SITE_0 = "SEOUL";
private static final String SITE_1 = "JINJU";
@@ -53,7 +53,7 @@ public final class SourceLocationAwareSchedulingConstraintTest {
when(executorRepresenter.getNodeName()).thenReturn(executorId);
return executorRepresenter;
}
-
+
@Before
public void setUp() throws Exception {
injector = Tang.Factory.getTang().newInjector();
@@ -61,13 +61,13 @@ public final class SourceLocationAwareSchedulingConstraintTest {
}
/**
- * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
+ * {@link LocalitySchedulingConstraint} should fail to schedule a {@link Task} when
* there are no executors in appropriate location(s).
*/
@Test
public void testSourceLocationAwareSchedulingNotAvailable() throws InjectionException {
final SchedulingConstraint schedulingConstraint = injector
- .getInstance(SourceLocationAwareSchedulingConstraint.class);
+ .getInstance(LocalitySchedulingConstraint.class);
// Prepare test scenario
final Task task = CreateTask.withReadablesWithSourceLocations(
@@ -81,13 +81,13 @@ public final class SourceLocationAwareSchedulingConstraintTest {
}
/**
- * {@link SourceLocationAwareSchedulingConstraint} should properly schedule {@link Task}s
+ * {@link LocalitySchedulingConstraint} should properly schedule {@link Task}s
* with multiple source locations.
*/
@Test
public void testSourceLocationAwareSchedulingWithMultiSource() throws InjectionException {
final SchedulingConstraint schedulingConstraint = injector
- .getInstance(SourceLocationAwareSchedulingConstraint.class);
+ .getInstance(LocalitySchedulingConstraint.class);
// Prepare test scenario
final Task task0 = CreateTask.withReadablesWithSourceLocations(
Collections.singletonList(Collections.singletonList(SITE_1)));
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index 64abefe..eabcac4 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -46,7 +46,7 @@ public final class SchedulingConstraintnRegistryTest {
assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ResourceSlotProperty.class, registry));
assertEquals(ContainerTypeAwareSchedulingConstraint.class,
getConstraintOf(ResourcePriorityProperty.class, registry));
- assertEquals(SourceLocationAwareSchedulingConstraint.class,
+ assertEquals(LocalitySchedulingConstraint.class,
getConstraintOf(ResourceLocalityProperty.class, registry));
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index c2bbd12..af3fe81 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -14,7 +14,6 @@
* limitations under the License.
*/
package edu.snu.nemo.runtime.master.scheduler;
-
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import edu.snu.nemo.runtime.common.RuntimeIdManager;
@@ -24,6 +23,7 @@ import edu.snu.nemo.runtime.common.message.MessageSender;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.state.BlockState;
import edu.snu.nemo.runtime.common.state.PlanState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -72,6 +72,7 @@ public final class TaskRetryTest {
private Scheduler scheduler;
private ExecutorRegistry executorRegistry;
private PlanStateManager planStateManager;
+ private BlockManagerMaster blockManagerMaster;
private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
@@ -96,13 +97,21 @@ public final class TaskRetryTest {
public void testExecutorRemoved() throws Exception {
// Until the plan finishes, events happen
while (!planStateManager.isPlanDone()) {
- // 50% chance remove, 50% chance add, 80% chance task completed
- executorRemoved(0.5);
- executorAdded(0.5);
- taskCompleted(0.8);
+ // 30% chance executor added, 30% chance executor removed
+ executorAdded(0.3);
+ executorRemoved(0.3);
+
+ // random - trigger speculative execution.
+ if (random.nextBoolean()) {
+ Thread.sleep(10);
+ } else {
+ Thread.sleep(20);
+ }
+
+ // 30% chance task completed,
+ taskCompleted(0.3);
- // 10ms sleep
- Thread.sleep(10);
+ scheduler.onSpeculativeExecutionCheck();
}
// Plan should COMPLETE
@@ -120,12 +129,17 @@ public final class TaskRetryTest {
// Until the plan finishes, events happen
while (!planStateManager.isPlanDone()) {
// 50% chance task completed
- // 50% chance task output write failed
+ // 70% chance task output write failed
taskCompleted(0.5);
- taskOutputWriteFailed(0.5);
+ taskOutputWriteFailed(0.7);
- // 10ms sleep
- Thread.sleep(10);
+ // random - trigger speculative execution.
+ if (random.nextBoolean()) {
+ Thread.sleep(10);
+ } else {
+ Thread.sleep(20);
+ }
+ scheduler.onSpeculativeExecutionCheck();
}
// Plan should COMPLETE
@@ -178,8 +192,17 @@ public final class TaskRetryTest {
if (!executingTasks.isEmpty()) {
final int randomIndex = random.nextInt(executingTasks.size());
final String selectedTask = executingTasks.get(randomIndex);
- SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
+
+
+ final Optional<ExecutorRepresenter> executor = executorRegistry.findExecutorForTask(selectedTask);
+ if (executor.isPresent()) {
+ SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
TaskState.State.COMPLETE, RuntimeIdManager.getAttemptFromTaskId(selectedTask));
+ getOutputBlockIds(selectedTask).forEach(blockId ->
+ blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executor.get().getExecutorId()));
+ } else {
+ throw new RuntimeException(selectedTask);
+ }
}
}
@@ -221,7 +244,16 @@ public final class TaskRetryTest {
injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
planStateManager = injector.getInstance(PlanStateManager.class);
scheduler = injector.getInstance(Scheduler.class);
+ blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);
}
+
+ private Set<String> getOutputBlockIds(final String taskId) {
+ return planStateManager.getPhysicalPlan().getStageDAG()
+ .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+ .stream()
+ .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+ .collect(Collectors.toSet()); // ids of blocks this task will produce
+ }
}