You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2020/03/09 06:14:24 UTC
[incubator-nemo] branch master updated: [NEMO-438] Create a
Simulator for Simulating an Execution of an Execution Plan (#288)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 4dd3ef0 [NEMO-438] Create a Simulator for Simulating an Execution of an Execution Plan (#288)
4dd3ef0 is described below
commit 4dd3ef0a8a4fb4407fdca2c470f4705f24d161f5
Author: Won Wook SONG <ws...@gmail.com>
AuthorDate: Mon Mar 9 15:14:13 2020 +0900
[NEMO-438] Create a Simulator for Simulating an Execution of an Execution Plan (#288)
JIRA: [NEMO-438: Create a Simulator for Simulating an Execution of an Execution Plan](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-438)
**Major changes:**
- Implements a SimulationScheduler that schedules a physical plan
- Implements a SimulatedExecutor that acts like an executor
- Refactor components used by both `BatchScheduler` and `SimulationScheduler` to `SchedulerUtils`
**Minor changes to note:**
- Refactors executor's ResourceSpecification to be able to be accessed in the driver as well
- Enables IRDAG to get the input size of the workload
- Added Job Duration to the Job Metric.
**Tests for the changes:**
- SimulationSchedulerTest.java tests for the implementations
**Other comments:**
- None
Closes #288
---
.../src/main/java/org/apache/nemo/common/Util.java | 57 +-
.../nemo/common/exception/SimulationException.java | 45 ++
.../main/java/org/apache/nemo/common/ir/IRDAG.java | 48 ++
.../java/org/apache/nemo/common/ir/IdManager.java | 11 +
.../executionproperty}/ResourceSpecification.java | 6 +-
.../nemo/compiler/optimizer/NemoOptimizer.java | 6 +
.../nemo/runtime/common/RuntimeIdManager.java | 10 -
.../nemo/runtime/common/message/MessageUtils.java | 103 ++++
.../nemo/runtime/common/metric/JobMetric.java | 1 +
.../org/apache/nemo/runtime/common/plan/Task.java | 7 +
.../apache/nemo/driver/UserApplicationRunner.java | 6 +
.../org/apache/nemo/runtime/executor/Executor.java | 5 +-
.../nemo/runtime/executor/TaskStateManager.java | 51 +-
.../nemo/runtime/executor/task/TaskExecutor.java | 2 +
.../nemo/runtime/master/PlanStateManager.java | 33 +-
.../apache/nemo/runtime/master/RuntimeMaster.java | 90 +--
.../nemo/runtime/master/metric/MetricStore.java | 8 +
.../runtime/master/resource/ContainerManager.java | 1 +
.../resource/DefaultExecutorRepresenter.java | 2 +-
.../runtime/master/scheduler/BatchScheduler.java | 341 ++----------
.../master/scheduler/BatchSchedulerUtils.java | 361 ++++++++++++
.../runtime/master/scheduler/ExecutorRegistry.java | 8 +
.../scheduler/PendingTaskCollectionPointer.java | 8 +
.../master/scheduler/SimulatedTaskExecutor.java | 243 ++++++++
.../master/scheduler/SimulationScheduler.java | 611 +++++++++++++++++++++
.../runtime/master/scheduler/TaskDispatcher.java | 19 +
.../nemo/runtime/master/ContainerManagerTest.java | 2 +-
.../master/scheduler/BatchSchedulerTest.java | 2 +-
.../master/scheduler/SimulationSchedulerTest.java | 105 ++++
.../runtime/master/scheduler/TaskRetryTest.java | 2 +-
30 files changed, 1740 insertions(+), 454 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/Util.java b/common/src/main/java/org/apache/nemo/common/Util.java
index 2b9e259..4bf4593 100644
--- a/common/src/main/java/org/apache/nemo/common/Util.java
+++ b/common/src/main/java/org/apache/nemo/common/Util.java
@@ -18,9 +18,14 @@
*/
package org.apache.nemo.common;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nemo.common.exception.JsonParseException;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
import org.apache.nemo.common.ir.vertex.utility.TriggerVertex;
@@ -32,8 +37,7 @@ import java.lang.instrument.Instrumentation;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.Optional;
+import java.util.*;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -232,6 +236,55 @@ public final class Util {
return "edge" + numericId;
}
+
+ /**
+ * Utility method for parsing the resource specification string.
+ *
+ * @param resourceSpecificationString the input resource specification string.
+ * @return the parsed list of resource specifications. The Integer indicates how many of the specified nodes are
+ * required, followed by the ResourceSpecification that indicates the specifications of the nodes.
+ */
+ public static List<Pair<Integer, ResourceSpecification>> parseResourceSpecificationString(
+ final String resourceSpecificationString) {
+ final List<Pair<Integer, ResourceSpecification>> resourceSpecifications = new ArrayList<>();
+ try {
+ if (resourceSpecificationString.trim().startsWith("[")) {
+ final TreeNode jsonRootNode = new ObjectMapper().readTree(resourceSpecificationString);
+
+ for (int i = 0; i < jsonRootNode.size(); i++) {
+ final TreeNode resourceNode = jsonRootNode.get(i);
+ final int executorNum = resourceNode.path("num").traverse().nextIntValue(1);
+ final String type = resourceNode.get("type").traverse().nextTextValue();
+ final int capacity = resourceNode.get("capacity").traverse().getIntValue();
+ final int memory = resourceNode.get("memory_mb").traverse().getIntValue();
+ final OptionalDouble maxOffheapRatio;
+ final OptionalInt poisonSec;
+
+ if (resourceNode.path("max_offheap_ratio").traverse().nextToken() == JsonToken.VALUE_NUMBER_FLOAT) {
+ maxOffheapRatio = OptionalDouble.of(resourceNode.path("max_offheap_ratio").traverse().getDoubleValue());
+ } else {
+ maxOffheapRatio = OptionalDouble.empty();
+ }
+
+ if (resourceNode.path("poison_sec").traverse().nextToken() == JsonToken.VALUE_NUMBER_INT) {
+ poisonSec = OptionalInt.of(resourceNode.path("poison_sec").traverse().getIntValue());
+ } else {
+ poisonSec = OptionalInt.empty();
+ }
+
+ resourceSpecifications.add(
+ Pair.of(executorNum, new ResourceSpecification(type, capacity, memory, maxOffheapRatio, poisonSec)));
+ }
+ } else {
+ throw new UnsupportedOperationException("Executor Info file should be a JSON file.");
+ }
+
+ return resourceSpecifications;
+ } catch (final Exception e) {
+ throw new JsonParseException(e);
+ }
+ }
+
/**
* Method for the instrumentation: for getting the object size.
*
diff --git a/common/src/main/java/org/apache/nemo/common/exception/SimulationException.java b/common/src/main/java/org/apache/nemo/common/exception/SimulationException.java
new file mode 100644
index 0000000..587d582
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/exception/SimulationException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.common.exception;
+
+/**
+ * SimulationException.
+ * Thrown when any exception occurs while trying to simulate
+ * a {org.apache.nemo.runtime.common.plan.physical.Task} to an executor.
+ */
+public final class SimulationException extends RuntimeException {
+ /**
+ * SimulationException.
+ *
+ * @param exception exception
+ */
+ public SimulationException(final Throwable exception) {
+ super(exception);
+ }
+
+ /**
+ * SimulationException.
+ *
+ * @param exception exception
+ */
+ public SimulationException(final String exception) {
+ super(exception);
+ }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index bcb76dd..c95bcf0 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -20,6 +20,7 @@ package org.apache.nemo.common.ir;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Sets;
+import org.apache.nemo.common.Pair;
import org.apache.nemo.common.PairKeyExtractor;
import org.apache.nemo.common.Util;
import org.apache.nemo.common.coder.BytesDecoderFactory;
@@ -28,10 +29,13 @@ import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.dag.DAGInterface;
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
+import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.LoopVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
@@ -78,6 +82,11 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
private final Map<IRVertex, Set<IRVertex>> messageVertexToGroup;
/**
+ * To remember the specifications of the executors used to run the IR DAG with.
+ */
+ private final List<Pair<Integer, ResourceSpecification>> executorInfo;
+
+ /**
* @param originalUserApplicationDAG the initial DAG.
*/
public IRDAG(final DAG<IRVertex, IREdge> originalUserApplicationDAG) {
@@ -86,6 +95,7 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
this.streamVertexToOriginalEdge = new HashMap<>();
this.samplingVertexToGroup = new HashMap<>();
this.messageVertexToGroup = new HashMap<>();
+ this.executorInfo = new ArrayList<>();
}
public IRDAGChecker.CheckerResult checkIntegrity() {
@@ -113,6 +123,12 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
* @return a IR DAG summary string, consisting of only the vertices generated from the frontend.
*/
public String irDAGSummary() {
+ final Long inputBytes = this.getInputSize();
+ final String inputSizeString = inputBytes < 1024 ? inputBytes + "B"
+ : (inputBytes / 1024 < 1024 ? inputBytes / 1024 + "KB"
+ : (inputBytes / 1048576 < 1024 ? inputBytes / 1048576 + "MB"
+ : (inputBytes / 1073741824L < 1024 ? inputBytes / 1073741824L + "GB"
+ : inputBytes / 1099511627776L + "TB")));
return "rv" + getRootVertices().size()
+ "_v" + getVertices().stream()
.filter(v -> !v.isUtilityVertex()) // Exclude utility vertices
@@ -120,9 +136,41 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
+ "_e" + getVertices().stream()
.filter(v -> !v.isUtilityVertex()) // Exclude utility vertices
.mapToInt(v -> getIncomingEdgesOf(v).size())
+ .sum() + "_" + inputSizeString;
+ }
+
+ /**
+ * @return the total sum of the input size for the IR DAG.
+ */
+ public Long getInputSize() {
+ return this.getRootVertices().stream()
+ .filter(irVertex -> irVertex instanceof SourceVertex)
+ .mapToLong(srcVertex -> {
+ try {
+ return ((SourceVertex) srcVertex).getEstimatedSizeBytes();
+ } catch (Exception e) {
+ throw new MetricException(e);
+ }
+ })
.sum();
}
+ /**
+ * Setter for the executor specifications information.
+ * @param parsedExecutorInfo executor information parsed for processing.
+ */
+ public void recordExecutorInfo(final List<Pair<Integer, ResourceSpecification>> parsedExecutorInfo) {
+ executorInfo.addAll(parsedExecutorInfo);
+ }
+
+ /**
+ * Getter for the executor specifications information.
+ * @return the executor specifications information.
+ */
+ public List<Pair<Integer, ResourceSpecification>> getExecutorInfo() {
+ return executorInfo;
+ }
+
////////////////////////////////////////////////// Methods for reshaping the DAG topology.
/**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IdManager.java b/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
index 1aad410..2ff5255 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* ID manager.
@@ -38,6 +39,7 @@ public final class IdManager {
private static AtomicInteger vertexId = new AtomicInteger(1);
private static AtomicInteger edgeId = new AtomicInteger(1);
+ private static AtomicLong resourceSpecIdGenerator = new AtomicLong(0);
private static volatile boolean isDriver = false;
// Vertex ID Map to be used upon cloning in loop vertices.
@@ -89,6 +91,15 @@ public final class IdManager {
}
/**
+ * Generates the ID for a resource specification.
+ *
+ * @return the generated ID
+ */
+ public static String generateResourceSpecId() {
+ return "ResourceSpec" + resourceSpecIdGenerator.getAndIncrement();
+ }
+
+ /**
* Set the realm of the loaded class as REEF driver.
*/
public static void setInDriver() {
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ResourceSpecification.java b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ResourceSpecification.java
similarity index 94%
rename from runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ResourceSpecification.java
rename to common/src/main/java/org/apache/nemo/common/ir/executionproperty/ResourceSpecification.java
index f3f0e7e..334ffba 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ResourceSpecification.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ResourceSpecification.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.runtime.master.resource;
+package org.apache.nemo.common.ir.executionproperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.common.ir.IdManager;
import java.util.OptionalDouble;
import java.util.OptionalInt;
@@ -46,7 +46,7 @@ public final class ResourceSpecification {
final int memory,
final OptionalDouble maxOffheapRatio,
final OptionalInt poisonSec) {
- this.resourceSpecId = RuntimeIdManager.generateResourceSpecId();
+ this.resourceSpecId = IdManager.generateResourceSpecId();
this.containerType = containerType;
this.capacity = capacity;
this.memory = memory;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 8c0da96..0dc9fe9 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.optimizer;
import net.jcip.annotations.NotThreadSafe;
+import org.apache.nemo.common.Util;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
import org.apache.nemo.common.ir.IRDAG;
@@ -50,6 +51,7 @@ public final class NemoOptimizer implements Optimizer {
private final String dagDirectory;
private final Policy optimizationPolicy;
private final String environmentTypeStr;
+ private final String executorInfoContents;
private final ClientRPC clientRPC;
private final Map<UUID, Integer> cacheIdToParallelism = new HashMap<>();
@@ -60,15 +62,18 @@ public final class NemoOptimizer implements Optimizer {
* @param dagDirectory to store JSON representation of intermediate DAGs.
* @param policyName the name of the optimization policy.
* @param environmentTypeStr the environment type of the workload to optimize the DAG for.
+ * @param executorInfoContents the string of the information of the executors provided.
* @param clientRPC the RPC channel to communicate with the client.
*/
@Inject
private NemoOptimizer(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
@Parameter(JobConf.OptimizationPolicy.class) final String policyName,
@Parameter(JobConf.EnvironmentType.class) final String environmentTypeStr,
+ @Parameter(JobConf.ExecutorJSONContents.class) final String executorInfoContents,
final ClientRPC clientRPC) {
this.dagDirectory = dagDirectory;
this.environmentTypeStr = OptimizerUtils.filterEnvironmentTypeString(environmentTypeStr);
+ this.executorInfoContents = executorInfoContents;
this.clientRPC = clientRPC;
try {
@@ -133,6 +138,7 @@ public final class NemoOptimizer implements Optimizer {
* @param policy the optimization policy to optimize the DAG with.
*/
private void beforeCompileTimeOptimization(final IRDAG dag, final Policy policy) {
+ dag.recordExecutorInfo(Util.parseResourceSpecificationString(this.executorInfoContents));
if (policy instanceof XGBoostPolicy) {
clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
.setType(ControlMessage.DriverToClientMessageType.LaunchOptimization)
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java
index bfb7bda..255cca7 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java
@@ -28,7 +28,6 @@ public final class RuntimeIdManager {
private static AtomicInteger physicalPlanIdGenerator = new AtomicInteger(0);
private static AtomicInteger executorIdGenerator = new AtomicInteger(0);
private static AtomicLong messageIdGenerator = new AtomicLong(1L);
- private static AtomicLong resourceSpecIdGenerator = new AtomicLong(0);
private static final String SPLITTER = "-";
/**
@@ -123,15 +122,6 @@ public final class RuntimeIdManager {
return messageIdGenerator.getAndIncrement();
}
- /**
- * Generates the ID for a resource specification.
- *
- * @return the generated ID
- */
- public static String generateResourceSpecId() {
- return "ResourceSpec" + resourceSpecIdGenerator.getAndIncrement();
- }
-
//////////////////////////////////////////////////////////////// Parse IDs
/**
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageUtils.java
new file mode 100644
index 0000000..241705c
--- /dev/null
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.common.message;
+
+import org.apache.nemo.common.exception.UnknownExecutionStateException;
+import org.apache.nemo.common.exception.UnknownFailureCauseException;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.state.TaskState;
+
+import static org.apache.nemo.runtime.common.state.TaskState.State.COMPLETE;
+import static org.apache.nemo.runtime.common.state.TaskState.State.ON_HOLD;
+
+/**
+ * Utility class for messages.
+ */
+public final class MessageUtils {
+ /**
+ * Private constructor for utility class.
+ */
+ private MessageUtils() {
+ }
+
+ public static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
+ switch (state) {
+ case READY:
+ return TaskState.State.READY;
+ case EXECUTING:
+ return TaskState.State.EXECUTING;
+ case COMPLETE:
+ return COMPLETE;
+ case FAILED_RECOVERABLE:
+ return TaskState.State.SHOULD_RETRY;
+ case FAILED_UNRECOVERABLE:
+ return TaskState.State.FAILED;
+ case ON_HOLD:
+ return ON_HOLD;
+ default:
+ throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+ }
+ }
+
+ public static ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
+ switch (state) {
+ case READY:
+ return ControlMessage.TaskStateFromExecutor.READY;
+ case EXECUTING:
+ return ControlMessage.TaskStateFromExecutor.EXECUTING;
+ case COMPLETE:
+ return ControlMessage.TaskStateFromExecutor.COMPLETE;
+ case SHOULD_RETRY:
+ return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
+ case FAILED:
+ return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
+ case ON_HOLD:
+ return ControlMessage.TaskStateFromExecutor.ON_HOLD;
+ default:
+ throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+ }
+ }
+
+ public static TaskState.RecoverableTaskFailureCause convertFailureCause(
+ final ControlMessage.RecoverableFailureCause cause) {
+ switch (cause) {
+ case InputReadFailure:
+ return TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE;
+ case OutputWriteFailure:
+ return TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE;
+ default:
+ throw new UnknownFailureCauseException(
+ new Throwable("The failure cause for the recoverable failure is unknown"));
+ }
+ }
+
+ public static ControlMessage.RecoverableFailureCause convertFailureCause(
+ final TaskState.RecoverableTaskFailureCause cause) {
+ switch (cause) {
+ case INPUT_READ_FAILURE:
+ return ControlMessage.RecoverableFailureCause.InputReadFailure;
+ case OUTPUT_WRITE_FAILURE:
+ return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
+ default:
+ throw new UnknownFailureCauseException(
+ new Throwable("The failure cause for the recoverable failure is unknown"));
+ }
+ }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
index 07ff84f..8725e4b 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
@@ -46,6 +46,7 @@ public final class JobMetric implements StateMetric<PlanState.State> {
private String vertexProperties;
private String edgeProperties;
private JsonNode irDagJson;
+ private volatile DAG<Stage, StageEdge> stageDAG;
private JsonNode stageDagJson;
private Long jobDuration;
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java
index 737b8a5..719075b 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java
@@ -89,6 +89,13 @@ public final class Task implements Serializable {
}
/**
+ * @return the Stage ID of the task.
+ */
+ public String getStageId() {
+ return RuntimeIdManager.getStageIdFromTaskId(this.getTaskId());
+ }
+
+ /**
* @return the incoming edges of the task.
*/
public List<StageEdge> getTaskIncomingEdges() {
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
index bf759a4..b3ac05b 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
@@ -25,10 +25,12 @@ import org.apache.nemo.compiler.backend.Backend;
import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter;
import org.apache.nemo.compiler.optimizer.Optimizer;
import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.PlanRewriter;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.RuntimeMaster;
+import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@ public final class UserApplicationRunner {
*/
public synchronized void run(final String dagString) {
try {
+ final long startTime = System.currentTimeMillis();
LOG.info("##### Nemo Compiler Start #####");
final IRDAG dag = SerializationUtils.deserialize(Base64.getDecoder().decode(dagString));
@@ -104,7 +107,10 @@ public final class UserApplicationRunner {
planStateManager.storeJSON("final");
}
+ final long endTime = System.currentTimeMillis();
LOG.info("{} is complete!", physicalPlan.getPlanId());
+ MetricStore.getStore().getOrCreateMetric(JobMetric.class, physicalPlan.getPlanId())
+ .setJobDuration(endTime - startTime);
} catch (final Exception e) {
throw new RuntimeException(e);
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index 4ef4c98..75c2e43 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -141,13 +141,12 @@ public final class Executor {
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
- irDag.getVertices().forEach(v -> {
+ irDag.getVertices().forEach(v ->
irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
- e.getPropertyValue(DecompressionProperty.class).orElse(null)));
- });
+ e.getPropertyValue(DecompressionProperty.class).orElse(null))));
new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
metricMessageSender, persistentConnectionToMasterMap).execute();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java
index 37b48d4..41f5d6e 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java
@@ -19,11 +19,10 @@
package org.apache.nemo.runtime.executor;
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.nemo.common.exception.UnknownExecutionStateException;
-import org.apache.nemo.common.exception.UnknownFailureCauseException;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.MessageUtils;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.metric.StateTransitionEvent;
import org.apache.nemo.runtime.common.plan.Task;
@@ -74,10 +73,8 @@ public final class TaskStateManager {
public synchronized void onTaskStateChanged(final TaskState.State newState,
final Optional<String> vertexPutOnHold,
final Optional<TaskState.RecoverableTaskFailureCause> cause) {
- metricMessageSender.send("TaskMetric", taskId, "stateTransitionEvent",
- SerializationUtils.serialize(new StateTransitionEvent<>(
- System.currentTimeMillis(), null, newState
- )));
+ metricMessageSender.send(METRIC, taskId, "stateTransitionEvent",
+ SerializationUtils.serialize(new StateTransitionEvent<>(System.currentTimeMillis(), null, newState)));
switch (newState) {
case EXECUTING:
@@ -119,13 +116,9 @@ public final class TaskStateManager {
.setExecutorId(executorId)
.setTaskId(taskId)
.setAttemptIdx(attemptIdx)
- .setState(convertState(newState));
- if (vertexPutOnHold.isPresent()) {
- msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
- }
- if (cause.isPresent()) {
- msgBuilder.setFailureCause(convertFailureCause(cause.get()));
- }
+ .setState(MessageUtils.convertState(newState));
+ vertexPutOnHold.ifPresent(msgBuilder::setVertexPutOnHoldId);
+ cause.ifPresent(c -> msgBuilder.setFailureCause(MessageUtils.convertFailureCause(c)));
// Send taskStateChangedMsg to master!
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
@@ -137,38 +130,6 @@ public final class TaskStateManager {
.build());
}
- private ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
- switch (state) {
- case READY:
- return ControlMessage.TaskStateFromExecutor.READY;
- case EXECUTING:
- return ControlMessage.TaskStateFromExecutor.EXECUTING;
- case COMPLETE:
- return ControlMessage.TaskStateFromExecutor.COMPLETE;
- case SHOULD_RETRY:
- return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
- case FAILED:
- return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
- case ON_HOLD:
- return ControlMessage.TaskStateFromExecutor.ON_HOLD;
- default:
- throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
- }
- }
-
- private ControlMessage.RecoverableFailureCause convertFailureCause(
- final TaskState.RecoverableTaskFailureCause cause) {
- switch (cause) {
- case INPUT_READ_FAILURE:
- return ControlMessage.RecoverableFailureCause.InputReadFailure;
- case OUTPUT_WRITE_FAILURE:
- return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
- default:
- throw new UnknownFailureCauseException(
- new Throwable("The failure cause for the recoverable failure is unknown"));
- }
- }
-
// Tentative
public void getCurrentTaskExecutionState() {
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 4d7890c..6e347d8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -121,6 +121,7 @@ public final class TaskExecutor {
final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory);
this.dataFetchers = pair.left();
this.sortedHarnesses = pair.right();
+
this.timeSinceLastExecution = System.currentTimeMillis();
}
@@ -356,6 +357,7 @@ public final class TaskExecutor {
for (final VertexHarness vertexHarness : sortedHarnesses) {
finalizeVertex(vertexHarness);
}
+
metricMessageSender.send(TASK_METRIC_ID, taskId, "taskDuration",
SerializationUtils.serialize(System.currentTimeMillis() - executionStartTime));
this.timeSinceLastExecution = System.currentTimeMillis();
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
index 1a033b4..5841adb 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
@@ -118,6 +118,22 @@ public final class PlanStateManager {
}
/**
+ * Static constructor for manual usage.
+ * @param dagDirectory the DAG directory to store the JSON to.
+ * @return a new PlanStateManager instance.
+ */
+ public static PlanStateManager newInstance(final String dagDirectory) {
+ return new PlanStateManager(dagDirectory);
+ }
+
+ /**
+ * @param metricStore set the metric store of the paln state manager.
+ */
+ public void setMetricStore(final MetricStore metricStore) {
+ this.metricStore = metricStore;
+ }
+
+ /**
* Update the physical plan and maximum attempt.
*
* @param physicalPlanToUpdate the physical plan to manage.
@@ -147,16 +163,13 @@ public final class PlanStateManager {
private void initializeStates() {
onPlanStateChanged(PlanState.State.EXECUTING);
physicalPlan.getStageDAG().topologicalDo(stage -> {
- if (!stageIdToState.containsKey(stage.getId())) {
- stageIdToState.put(stage.getId(), new StageState());
- stageIdToTaskIdxToAttemptStates.put(stage.getId(), new HashMap<>());
-
- // for each task idx of this stage
- for (final int taskIndex : stage.getTaskIndices()) {
- stageIdToTaskIdxToAttemptStates.get(stage.getId()).put(taskIndex, new ArrayList<>());
- // task states will be initialized lazily in getTaskAttemptsToSchedule()
- }
- }
+ stageIdToState.putIfAbsent(stage.getId(), new StageState());
+ stageIdToTaskIdxToAttemptStates.putIfAbsent(stage.getId(), new HashMap<>());
+
+ // for each task idx of this stage
+ stage.getTaskIndices().forEach(taskIndex ->
+ stageIdToTaskIdxToAttemptStates.get(stage.getId()).putIfAbsent(taskIndex, new ArrayList<>()));
+ // task states will be initialized lazily in getTaskAttemptsToSchedule()
});
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
index 45a9cf0..99ee8e5 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
@@ -18,31 +18,28 @@
*/
package org.apache.nemo.runtime.master;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.common.Pair;
-import org.apache.nemo.common.exception.*;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.exception.ContainerException;
+import org.apache.nemo.common.exception.IllegalMessageException;
+import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
-import org.apache.nemo.runtime.common.message.ClientRPC;
-import org.apache.nemo.runtime.common.message.MessageContext;
-import org.apache.nemo.runtime.common.message.MessageEnvironment;
-import org.apache.nemo.runtime.common.message.MessageListener;
+import org.apache.nemo.runtime.common.message.*;
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
-import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.nemo.runtime.master.resource.ContainerManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.nemo.runtime.master.servlet.*;
@@ -60,13 +57,13 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.Serializable;
import java.nio.file.Paths;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.nemo.runtime.common.state.TaskState.State.COMPLETE;
-import static org.apache.nemo.runtime.common.state.TaskState.State.ON_HOLD;
-
/**
* (WARNING) Use runtimeMasterThread for all public methods to avoid race conditions.
* See comments in the {@link Scheduler} for avoiding race conditions.
@@ -305,33 +302,14 @@ public final class RuntimeMaster {
public void requestContainer(final String resourceSpecificationString) {
final Future<?> containerRequestEventResult = runtimeMasterThread.submit(() -> {
try {
- final TreeNode jsonRootNode = objectMapper.readTree(resourceSpecificationString);
-
- for (int i = 0; i < jsonRootNode.size(); i++) {
- final TreeNode resourceNode = jsonRootNode.get(i);
- final String type = resourceNode.get("type").traverse().nextTextValue();
- final int memory = resourceNode.get("memory_mb").traverse().getIntValue();
- final OptionalDouble maxOffheapRatio;
- final int capacity = resourceNode.get("capacity").traverse().getIntValue();
- final int executorNum = resourceNode.path("num").traverse().nextIntValue(1);
- final OptionalInt poisonSec;
-
- if (resourceNode.path("max_offheap_ratio").traverse().nextToken() == JsonToken.VALUE_NUMBER_FLOAT) {
- maxOffheapRatio = OptionalDouble.of(resourceNode.path("max_offheap_ratio").traverse().getDoubleValue());
- } else {
- maxOffheapRatio = OptionalDouble.empty();
- }
-
- if (resourceNode.path("poison_sec").traverse().nextToken() == JsonToken.VALUE_NUMBER_INT) {
- poisonSec = OptionalInt.of(resourceNode.path("poison_sec").traverse().getIntValue());
- } else {
- poisonSec = OptionalInt.empty();
- }
+ final List<Pair<Integer, ResourceSpecification>> resourceSpecificationList = // pair of (# of executors, specs)
+ Util.parseResourceSpecificationString(resourceSpecificationString);
- resourceRequestCount.getAndAdd(executorNum);
- containerManager.requestContainer(executorNum, new ResourceSpecification(type, capacity, memory,
- maxOffheapRatio, poisonSec));
+ for (final Pair<Integer, ResourceSpecification> resourceSpecification: resourceSpecificationList) {
+ resourceRequestCount.getAndAdd(resourceSpecification.left());
+ containerManager.requestContainer(resourceSpecification.left(), resourceSpecification.right());
}
+
metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
} catch (final Exception e) {
throw new ContainerException(e);
@@ -454,9 +432,9 @@ public final class RuntimeMaster {
scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
taskStateChangedMsg.getTaskId(),
taskStateChangedMsg.getAttemptIdx(),
- convertTaskState(taskStateChangedMsg.getState()),
+ MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
taskStateChangedMsg.getVertexPutOnHoldId(),
- convertFailureCause(taskStateChangedMsg.getFailureCause()));
+ MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
break;
case ExecutorFailed:
// Executor failed due to user code.
@@ -493,38 +471,6 @@ public final class RuntimeMaster {
}
}
- private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
- switch (state) {
- case READY:
- return TaskState.State.READY;
- case EXECUTING:
- return TaskState.State.EXECUTING;
- case COMPLETE:
- return COMPLETE;
- case FAILED_RECOVERABLE:
- return TaskState.State.SHOULD_RETRY;
- case FAILED_UNRECOVERABLE:
- return TaskState.State.FAILED;
- case ON_HOLD:
- return ON_HOLD;
- default:
- throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
- }
- }
-
- private TaskState.RecoverableTaskFailureCause convertFailureCause(
- final ControlMessage.RecoverableFailureCause cause) {
- switch (cause) {
- case InputReadFailure:
- return TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE;
- case OutputWriteFailure:
- return TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE;
- default:
- throw new UnknownFailureCauseException(
- new Throwable("The failure cause for the recoverable failure is unknown"));
- }
- }
-
/**
* Schedules a periodic DAG logging thread.
* TODO #20: RESTful APIs to Access Job State and Metric.
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
index 3d3db4f..53477bc 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
@@ -79,6 +79,14 @@ public final class MetricStore {
}
/**
+ * Static class for creating a new instance.
+ * @return a new MetricStore instance.
+ */
+ public static MetricStore newInstance() {
+ return new MetricStore();
+ }
+
+ /**
* Get the metric class by its name.
*
* @param className the name of the class.
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
index 40a2210..defd170 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
@@ -19,6 +19,7 @@
package org.apache.nemo.runtime.master.resource;
import org.apache.nemo.common.exception.ContainerException;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.message.FailedMessageSender;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
index 04e9051..16c9a70 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -120,7 +121,6 @@ public final class DefaultExecutorRepresenter implements ExecutorRepresenter {
runningTaskToAttempt.put(task, task.getAttemptIdx());
failedTasks.remove(task);
-
serializationExecutorService.execute(() -> {
final byte[] serialized = SerializationUtils.serialize(task);
sendControlMessage(
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index 4c3d112..cf8a123 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -18,21 +18,13 @@
*/
package org.apache.nemo.runtime.master.scheduler;
-import com.google.common.collect.Sets;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.nemo.common.Pair;
-import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
-import org.apache.nemo.common.exception.UnknownFailureCauseException;
import org.apache.nemo.common.exception.UnrecoverableFailureException;
-import org.apache.nemo.common.ir.Readable;
-import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.*;
-import org.apache.nemo.runtime.common.state.BlockState;
import org.apache.nemo.runtime.common.state.StageState;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.BlockManagerMaster;
@@ -47,7 +39,6 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.util.*;
-import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -55,6 +46,8 @@ import java.util.stream.Collectors;
* (i.e., runtimeMasterThread in RuntimeMaster)
* <p>
* BatchScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
+ *
+ * Note: When modifying this class, take a look at {@link SimulationScheduler}.
*/
@DriverSide
@NotThreadSafe
@@ -69,20 +62,20 @@ public final class BatchScheduler implements Scheduler {
/**
* Components related to scheduling the given plan.
*/
- private final TaskDispatcher taskDispatcher;
- private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
- private final ExecutorRegistry executorRegistry;
- private final PlanStateManager planStateManager;
+ private final TaskDispatcher taskDispatcher; // Class for dispatching tasks.
+ private final PendingTaskCollectionPointer pendingTaskCollectionPointer; // A 'pointer' to the list of pending tasks.
+ private final ExecutorRegistry executorRegistry; // A registry for executors available for the job.
+ private final PlanStateManager planStateManager; // A component that manages the state of the plan.
/**
* Other necessary components of this {@link org.apache.nemo.runtime.master.RuntimeMaster}.
*/
- private final BlockManagerMaster blockManagerMaster;
+ private final BlockManagerMaster blockManagerMaster; // A component that manages data blocks.
/**
* The below variables depend on the submitted plan to execute.
*/
- private List<List<Stage>> sortedScheduleGroups;
+ private List<List<Stage>> sortedScheduleGroups; // Stages, sorted in the order to be scheduled.
@Inject
private BatchScheduler(final PlanRewriter planRewriter,
@@ -127,55 +120,13 @@ public final class BatchScheduler implements Scheduler {
}
/**
- * @param taskId that generated the message.
- * @param data of the message.
- */
- public void onRunTimePassMessage(final String taskId, final Object data) {
- final Set<StageEdge> targetEdges = getEdgesToOptimize(taskId);
- planRewriter.accumulate(getMessageId(targetEdges), data);
- }
-
- /**
- * Action for after task execution is put on hold.
+ * Process the RuntimePassMessage.
*
- * @param executorId the ID of the executor.
- * @param taskId the ID of the task.
+ * @param taskId that generated the message.
+ * @param data of the message.
*/
- private void onTaskExecutionOnHold(final String executorId,
- final String taskId) {
- LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
- executorRegistry.updateExecutor(executorId, (executor, state) -> {
- executor.onTaskExecutionComplete(taskId);
- return Pair.of(executor, state);
- });
- final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
-
- final boolean stageComplete =
- planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
-
- final Set<StageEdge> targetEdges = getEdgesToOptimize(taskId);
- if (targetEdges.isEmpty()) {
- throw new RuntimeException("No edges specified for data skew optimization");
- }
-
- if (stageComplete) {
- final PhysicalPlan updatedPlan = planRewriter.rewrite(
- planStateManager.getPhysicalPlan(), getMessageId(targetEdges));
- updatePlan(updatedPlan);
- }
- }
-
- private int getMessageId(final Set<StageEdge> stageEdges) {
- // Here we simply use findFirst() for now...
- // TODO #345: Simplify insert
- final Set<Integer> messageIds = stageEdges.stream()
- .map(edge -> edge.getExecutionProperties()
- .get(MessageIdEdgeProperty.class)
- .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
- .findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
- // Type casting is needed. See: https://stackoverflow.com/a/40865318
-
- return messageIds.iterator().next();
+ public void onRunTimePassMessage(final String taskId, final Object data) {
+ BatchSchedulerUtils.onRunTimePassMessage(planStateManager, planRewriter, taskId, data);
}
////////////////////////////////////////////////////////////////////// Methods for scheduling.
@@ -231,14 +182,18 @@ public final class BatchScheduler implements Scheduler {
planStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
- onTaskExecutionComplete(executorId, taskId);
+ BatchSchedulerUtils.onTaskExecutionComplete(executorRegistry, executorId, taskId);
break;
case SHOULD_RETRY:
// SHOULD_RETRY from an executor means that the task ran into a recoverable failure
- onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
+ BatchSchedulerUtils.onTaskExecutionFailedRecoverable(planStateManager, blockManagerMaster, executorRegistry,
+ executorId, taskId, failureCause);
break;
case ON_HOLD:
- onTaskExecutionOnHold(executorId, taskId);
+ final Optional<PhysicalPlan> optionalPhysicalPlan =
+ BatchSchedulerUtils
+ .onTaskExecutionOnHold(planStateManager, executorRegistry, planRewriter, executorId, taskId);
+ optionalPhysicalPlan.ifPresent(this::updatePlan);
break;
case FAILED:
throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
@@ -286,18 +241,18 @@ public final class BatchScheduler implements Scheduler {
public void onSpeculativeExecutionCheck() {
MutableBoolean isNewCloneCreated = new MutableBoolean(false);
- selectEarliestSchedulableGroup().ifPresent(scheduleGroup ->
- scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
- final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
+ BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager)
+ .ifPresent(scheduleGroup ->
+ scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
+ final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
- // Only if the ClonedSchedulingProperty is set...
- stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
- if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
- isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
- }
- });
- })
- );
+ // Only if the ClonedSchedulingProperty is set...
+ stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
+ if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
+ isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
+ }
+ });
+ }));
if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
@@ -327,7 +282,7 @@ public final class BatchScheduler implements Scheduler {
interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
// Retry the interrupted tasks (and required parents)
- retryTasksAndRequiredParents(interruptedTasks);
+ BatchSchedulerUtils.retryTasksAndRequiredParents(planStateManager, blockManagerMaster, interruptedTasks);
// Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
doSchedule();
@@ -350,11 +305,13 @@ public final class BatchScheduler implements Scheduler {
* - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
*/
private void doSchedule() {
- final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
+ final Optional<List<Stage>> earliest =
+ BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager);
if (earliest.isPresent()) {
final List<Task> tasksToSchedule = earliest.get().stream()
- .flatMap(stage -> selectSchedulableTasks(stage).stream())
+ .flatMap(stage ->
+ BatchSchedulerUtils.selectSchedulableTasks(planStateManager, blockManagerMaster, stage).stream())
.collect(Collectors.toList());
if (!tasksToSchedule.isEmpty()) {
LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
@@ -373,55 +330,6 @@ public final class BatchScheduler implements Scheduler {
}
}
- private Optional<List<Stage>> selectEarliestSchedulableGroup() {
- if (sortedScheduleGroups == null) {
- return Optional.empty();
- }
-
- return sortedScheduleGroups.stream()
- .filter(scheduleGroup -> scheduleGroup.stream()
- .map(Stage::getId)
- .map(planStateManager::getStageState)
- .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
- .findFirst(); // selects the one with the smallest scheduling group index.
- }
-
- private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
- if (stageToSchedule.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)) {
- // Ignore ghost stage.
- for (final String taskId : planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId())) {
- planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
- planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
- }
-
- return Collections.emptyList();
- }
-
- final List<StageEdge> stageIncomingEdges =
- planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
- final List<StageEdge> stageOutgoingEdges =
- planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
-
- // Create and return tasks.
- final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
-
- final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
- final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
- taskIdsToSchedule.forEach(taskId -> {
- final Set<String> blockIds = getOutputBlockIds(taskId);
- blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
- final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
- tasks.add(new Task(
- planStateManager.getPhysicalPlan().getPlanId(),
- taskId,
- stageToSchedule.getExecutionProperties(),
- stageToSchedule.getSerializedIRDAG(),
- stageIncomingEdges,
- stageOutgoingEdges,
- vertexIdToReadables.get(taskIdx)));
- });
- return tasks;
- }
////////////////////////////////////////////////////////////////////// Task cloning methods.
@@ -476,181 +384,4 @@ public final class BatchScheduler implements Scheduler {
return false;
}
-
- ////////////////////////////////////////////////////////////////////// Task state change handlers
-
- /**
- * Action after task execution has been completed.
- * Note this method should not be invoked when the previous state of the task is ON_HOLD.
- *
- * @param executorId id of the executor.
- * @param taskId the ID of the task completed.
- */
- private void onTaskExecutionComplete(final String executorId,
- final String taskId) {
- LOG.debug("{} completed in {}", taskId, executorId);
- executorRegistry.updateExecutor(executorId, (executor, state) -> {
- executor.onTaskExecutionComplete(taskId);
- return Pair.of(executor, state);
- });
- }
-
- /**
- * Get the target edges of dynamic optimization.
- * The edges are annotated with {@link MessageIdEdgeProperty}, which are outgoing edges of
- * parents of the stage put on hold.
- * <p>
- * See {@link org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.SkewReshapingPass}
- * for setting the target edges of dynamic optimization.
- *
- * @param taskId the task ID that sent stage-level aggregated message for dynamic optimization.
- * @return the edges to optimize.
- */
- private Set<StageEdge> getEdgesToOptimize(final String taskId) {
- final DAG<Stage, StageEdge> stageDag = planStateManager.getPhysicalPlan().getStageDAG();
-
- // Get a stage including the given task
- final Stage stagePutOnHold = stageDag.getVertices().stream()
- .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
- .findFirst()
- .orElseThrow(RuntimeException::new);
-
- // Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
- // should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
- final List<Integer> messageIds = stagePutOnHold.getIRDAG()
- .getVertices()
- .stream()
- .filter(v -> v.getPropertyValue(MessageIdVertexProperty.class).isPresent())
- .map(v -> v.getPropertyValue(MessageIdVertexProperty.class).get())
- .collect(Collectors.toList());
- if (messageIds.size() != 1) {
- throw new IllegalStateException("Must be exactly one vertex with the message id: " + messageIds.toString());
- }
- final int messageId = messageIds.get(0);
- final Set<StageEdge> targetEdges = new HashSet<>();
-
- // Get edges with identical MessageIdEdgeProperty (except the put on hold stage)
- for (final Stage stage : stageDag.getVertices()) {
- final Set<StageEdge> targetEdgesFound = stageDag.getOutgoingEdgesOf(stage).stream()
- .filter(candidateEdge -> {
- final Optional<HashSet<Integer>> candidateMCId =
- candidateEdge.getPropertyValue(MessageIdEdgeProperty.class);
- return candidateMCId.isPresent() && candidateMCId.get().contains(messageId);
- })
- .collect(Collectors.toSet());
- targetEdges.addAll(targetEdgesFound);
- }
-
- return targetEdges;
- }
-
- /**
- * Action for after task execution has failed but it's recoverable.
- *
- * @param executorId the ID of the executor
- * @param taskId the ID of the task
- * @param failureCause the cause of failure
- */
- private void onTaskExecutionFailedRecoverable(final String executorId,
- final String taskId,
- final TaskState.RecoverableTaskFailureCause failureCause) {
- LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
- executorRegistry.updateExecutor(executorId, (executor, state) -> {
- executor.onTaskExecutionFailed(taskId);
- return Pair.of(executor, state);
- });
-
- switch (failureCause) {
- // Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
- case INPUT_READ_FAILURE:
- // TODO #54: Handle remote data fetch failures
- case OUTPUT_WRITE_FAILURE:
- blockManagerMaster.onProducerTaskFailed(taskId);
- break;
- default:
- throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
- }
-
- retryTasksAndRequiredParents(Collections.singleton(taskId));
- }
-
- ////////////////////////////////////////////////////////////////////// Helper methods
-
- private void retryTasksAndRequiredParents(final Set<String> tasks) {
- final Set<String> requiredParents = recursivelyGetParentTasksForLostBlocks(tasks);
- final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
- LOG.info("Will be retried: {}", tasksToRetry);
- tasksToRetry.forEach(
- taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
- }
-
- private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
- if (children.isEmpty()) {
- return Collections.emptySet();
- }
- final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
-
- final Map<String, StageEdge> idToIncomingEdges = children.stream()
- .map(RuntimeIdManager::getStageIdFromTaskId)
- .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
- // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
- .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
-
- final Set<String> parentsWithLostBlocks = children.stream()
- .flatMap(child -> getInputBlockIds(child).stream()) // child task id -> parent block ids
- .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
- .collect(Collectors.toSet()).stream() // remove duplicate wildcards
- .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
- blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
- .flatMap(lostParentBlockWildcard -> {
- // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
- final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
- final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
- final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
- return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
- .stream()
- .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
- && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
- // COMPLETE -> SHOULD_RETRY
- .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
- })
- .collect(Collectors.toSet());
-
-
- // Recursive call
- return Sets.union(parentsWithLostBlocks, recursivelyGetParentTasksForLostBlocks(parentsWithLostBlocks));
- }
-
- private Set<String> getOutputBlockIds(final String taskId) {
- return planStateManager.getPhysicalPlan().getStageDAG()
- .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
- .stream()
- .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
- .collect(Collectors.toSet()); // ids of blocks this task will produce
- }
-
- private Set<String> getInputBlockIds(final String childTaskId) {
- final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
- return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
- .stream()
- .flatMap(inStageEdge -> {
- final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
- switch (inStageEdge.getDataCommunicationPattern()) {
- case SHUFFLE:
- case BROADCAST:
- // All of the parent stage's tasks
- return parentTaskIds.stream()
- .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
- case ONE_TO_ONE:
- // Same-index tasks of the parent stage
- return parentTaskIds.stream()
- .filter(parentTaskId ->
- RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
- .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
- default:
- throw new IllegalStateException(inStageEdge.toString());
- }
- })
- .collect(Collectors.toSet());
- }
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java
new file mode 100644
index 0000000..f7249cb
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.google.common.collect.Sets;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.exception.UnknownFailureCauseException;
+import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.*;
+import org.apache.nemo.runtime.common.state.BlockState;
+import org.apache.nemo.runtime.common.state.StageState;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PlanStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Utlity methods regarding schedulers.
+ */
+public final class BatchSchedulerUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(BatchSchedulerUtils.class.getName());
+
+ /**
+ * Private constructor for utility class.
+ */
+ private BatchSchedulerUtils() {
+ }
+
+
+ static Optional<List<Stage>> selectEarliestSchedulableGroup(final List<List<Stage>> sortedScheduleGroups,
+ final PlanStateManager planStateManager) {
+ if (sortedScheduleGroups == null) {
+ return Optional.empty();
+ }
+
+ return sortedScheduleGroups.stream()
+ .filter(scheduleGroup -> scheduleGroup.stream()
+ .map(Stage::getId)
+ .map(planStateManager::getStageState)
+ .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
+ .findFirst(); // selects the one with the smallest scheduling group index.
+ }
+
+ static List<Task> selectSchedulableTasks(final PlanStateManager planStateManager,
+ final BlockManagerMaster blockManagerMaster,
+ final Stage stageToSchedule) {
+ if (stageToSchedule.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)) {
+ // Ignore ghost stage.
+ for (final String taskId : planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId())) {
+ planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+ planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+ }
+
+ return Collections.emptyList();
+ }
+
+ final List<StageEdge> stageIncomingEdges =
+ planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+ final List<StageEdge> stageOutgoingEdges =
+ planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+
+ // Create and return tasks.
+ final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
+
+ final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
+ final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
+ taskIdsToSchedule.forEach(taskId -> {
+ final Set<String> blockIds = BatchSchedulerUtils.getOutputBlockIds(planStateManager, taskId);
+ blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
+ final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
+ tasks.add(new Task(
+ planStateManager.getPhysicalPlan().getPlanId(),
+ taskId,
+ stageToSchedule.getExecutionProperties(),
+ stageToSchedule.getSerializedIRDAG(),
+ stageIncomingEdges,
+ stageOutgoingEdges,
+ vertexIdToReadables.get(taskIdx)));
+ });
+ return tasks;
+ }
+
+ ////////////////////////////////////////////////////////////////////// Task state change handlers
+
+ /**
+ * Action after task execution has been completed.
+ * Note this method should not be invoked when the previous state of the task is ON_HOLD.
+ *
+ * @param executorRegistry the registry for available executors.
+ * @param executorId id of the executor.
+ * @param taskId the ID of the task completed.
+ */
+ static void onTaskExecutionComplete(final ExecutorRegistry executorRegistry,
+ final String executorId,
+ final String taskId) {
+ LOG.debug("{} completed in {}", taskId, executorId);
+ executorRegistry.updateExecutor(executorId, (executor, state) -> {
+ executor.onTaskExecutionComplete(taskId);
+ return Pair.of(executor, state);
+ });
+ }
+
+ /**
+ * Get the target edges of dynamic optimization.
+ * The edges are annotated with {@link MessageIdEdgeProperty}, which are outgoing edges of
+ * parents of the stage put on hold.
+ * <p>
+ * See {@link org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.SkewReshapingPass}
+ * for setting the target edges of dynamic optimization.
+ *
+ * @param taskId the task ID that sent stage-level aggregated message for dynamic optimization.
+ * @return the edges to optimize.
+ */
+ static Set<StageEdge> getEdgesToOptimize(final PlanStateManager planStateManager,
+ final String taskId) {
+ final DAG<Stage, StageEdge> stageDag = planStateManager.getPhysicalPlan().getStageDAG();
+
+ // Get a stage including the given task
+ final Stage stagePutOnHold = stageDag.getVertices().stream()
+ .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
+ .findFirst()
+ .orElseThrow(RuntimeException::new);
+
+ // Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
+ // should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
+ final List<Integer> messageIds = stagePutOnHold.getIRDAG()
+ .getVertices()
+ .stream()
+ .filter(v -> v.getPropertyValue(MessageIdVertexProperty.class).isPresent())
+ .map(v -> v.getPropertyValue(MessageIdVertexProperty.class).get())
+ .collect(Collectors.toList());
+ if (messageIds.size() != 1) {
+ throw new IllegalStateException("Must be exactly one vertex with the message id: " + messageIds.toString());
+ }
+ final int messageId = messageIds.get(0);
+ final Set<StageEdge> targetEdges = new HashSet<>();
+
+ // Get edges with identical MessageIdEdgeProperty (except the put on hold stage)
+ for (final Stage stage : stageDag.getVertices()) {
+ final Set<StageEdge> targetEdgesFound = stageDag.getOutgoingEdgesOf(stage).stream()
+ .filter(candidateEdge -> {
+ final Optional<HashSet<Integer>> candidateMCId =
+ candidateEdge.getPropertyValue(MessageIdEdgeProperty.class);
+ return candidateMCId.isPresent() && candidateMCId.get().contains(messageId);
+ })
+ .collect(Collectors.toSet());
+ targetEdges.addAll(targetEdgesFound);
+ }
+
+ return targetEdges;
+ }
+
+ /**
+ * Action for after task execution has failed but it's recoverable.
+ *
+ * @param executorId the ID of the executor
+ * @param taskId the ID of the task
+ * @param failureCause the cause of failure
+ */
+ static void onTaskExecutionFailedRecoverable(final PlanStateManager planStateManager,
+ final BlockManagerMaster blockManagerMaster,
+ final ExecutorRegistry executorRegistry,
+ final String executorId,
+ final String taskId,
+ final TaskState.RecoverableTaskFailureCause failureCause) {
+ LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
+ executorRegistry.updateExecutor(executorId, (executor, state) -> {
+ executor.onTaskExecutionFailed(taskId);
+ return Pair.of(executor, state);
+ });
+
+ switch (failureCause) {
+ // Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
+ case INPUT_READ_FAILURE:
+ // TODO #54: Handle remote data fetch failures
+ case OUTPUT_WRITE_FAILURE:
+ blockManagerMaster.onProducerTaskFailed(taskId);
+ break;
+ default:
+ throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
+ }
+
+ retryTasksAndRequiredParents(planStateManager, blockManagerMaster, Collections.singleton(taskId));
+ }
+
+ /**
+ * Action for after task execution is put on hold.
+ *
+ * @param executorId the ID of the executor.
+ * @param taskId the ID of the task.
+ */
+ static Optional<PhysicalPlan> onTaskExecutionOnHold(final PlanStateManager planStateManager,
+ final ExecutorRegistry executorRegistry,
+ final PlanRewriter planRewriter,
+ final String executorId,
+ final String taskId) {
+ LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
+ executorRegistry.updateExecutor(executorId, (executor, state) -> {
+ executor.onTaskExecutionComplete(taskId);
+ return Pair.of(executor, state);
+ });
+ final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
+
+ final boolean stageComplete =
+ planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+
+ final Set<StageEdge> targetEdges = getEdgesToOptimize(planStateManager, taskId);
+ if (targetEdges.isEmpty()) {
+ throw new RuntimeException("No edges specified for data skew optimization");
+ }
+
+ if (stageComplete) {
+ return Optional.of(planRewriter.rewrite(
+ planStateManager.getPhysicalPlan(), getMessageId(targetEdges)));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Process the RuntimePassMessage.
+ *
+ * @param planStateManager to get the edges for the optimization.
+ * @param planRewriter for rewriting the plan later on.
+ * @param taskId that generated the message.
+ * @param data of the message.
+ */
+ public static void onRunTimePassMessage(final PlanStateManager planStateManager, final PlanRewriter planRewriter,
+ final String taskId, final Object data) {
+ final Set<StageEdge> targetEdges = BatchSchedulerUtils.getEdgesToOptimize(planStateManager, taskId);
+ planRewriter.accumulate(BatchSchedulerUtils.getMessageId(targetEdges), data);
+ }
+
+ static int getMessageId(final Set<StageEdge> stageEdges) {
+ // Here we simply use findFirst() for now...
+ // TODO #345: Simplify insert
+ final Set<Integer> messageIds = stageEdges.stream()
+ .map(edge -> edge.getExecutionProperties()
+ .get(MessageIdEdgeProperty.class)
+ .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
+ .findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
+ // Type casting is needed. See: https://stackoverflow.com/a/40865318
+
+ return messageIds.iterator().next();
+ }
+
+ ////////////////////////////////////////////////////////////////////// Helper methods
+
+ static void retryTasksAndRequiredParents(final PlanStateManager planStateManager,
+ final BlockManagerMaster blockManagerMaster,
+ final Set<String> tasks) {
+ final Set<String> requiredParents =
+ recursivelyGetParentTasksForLostBlocks(planStateManager, blockManagerMaster, tasks);
+ final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
+ LOG.info("Will be retried: {}", tasksToRetry);
+ tasksToRetry.forEach(
+ taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+ }
+
+ static Set<String> recursivelyGetParentTasksForLostBlocks(final PlanStateManager planStateManager,
+ final BlockManagerMaster blockManagerMaster,
+ final Set<String> children) {
+ if (children.isEmpty()) {
+ return Collections.emptySet();
+ }
+ final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
+
+ final Map<String, StageEdge> idToIncomingEdges = children.stream()
+ .map(RuntimeIdManager::getStageIdFromTaskId)
+ .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
+ // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
+ .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
+
+ final Set<String> parentsWithLostBlocks = children.stream()
+ .flatMap(child -> getInputBlockIds(planStateManager, child).stream()) // child task id -> parent block ids
+ .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
+ .collect(Collectors.toSet()).stream() // remove duplicate wildcards
+ .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
+ blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
+ .flatMap(lostParentBlockWildcard -> {
+ // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
+ final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
+ final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
+ final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
+ return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
+ .stream()
+ .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
+ && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
+ // COMPLETE -> SHOULD_RETRY
+ .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
+ })
+ .collect(Collectors.toSet());
+
+
+ // Recursive call
+ return Sets.union(parentsWithLostBlocks,
+ recursivelyGetParentTasksForLostBlocks(planStateManager, blockManagerMaster, parentsWithLostBlocks));
+ }
+
+ static Set<String> getOutputBlockIds(final PlanStateManager planStateManager,
+ final String taskId) {
+ return planStateManager.getPhysicalPlan().getStageDAG()
+ .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+ .stream()
+ .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+ .collect(Collectors.toSet()); // ids of blocks this task will produce
+ }
+
+ static Set<String> getInputBlockIds(final PlanStateManager planStateManager,
+ final String childTaskId) {
+ final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
+ return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
+ .stream()
+ .flatMap(inStageEdge -> {
+ final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
+ switch (inStageEdge.getDataCommunicationPattern()) {
+ case SHUFFLE:
+ case BROADCAST:
+ // All of the parent stage's tasks
+ return parentTaskIds.stream()
+ .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+ case ONE_TO_ONE:
+ // Same-index tasks of the parent stage
+ return parentTaskIds.stream()
+ .filter(parentTaskId ->
+ RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
+ .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+ default:
+ throw new IllegalStateException(inStageEdge.toString());
+ }
+ })
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
index 8395f01..edf15ab 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -57,6 +57,14 @@ public final class ExecutorRegistry {
this.executors = new HashMap<>();
}
+ /**
+ * Static constructor for manual usage.
+ * @return a new instance of ExecutorRegistry.
+ */
+ public static ExecutorRegistry newInstance() {
+ return new ExecutorRegistry();
+ }
+
synchronized void registerExecutor(final ExecutorRepresenter executor) {
final String executorId = executor.getExecutorId();
if (executors.containsKey(executorId)) {
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
index 575e9a6..d3397c3 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
@@ -39,6 +39,14 @@ public final class PendingTaskCollectionPointer {
}
/**
+ * Static constructor for manual usage.
+ * @return a new instance of PendingTaskCollectionPointer.
+ */
+ public static PendingTaskCollectionPointer newInstance() {
+ return new PendingTaskCollectionPointer();
+ }
+
+ /**
* This collection of tasks should take precedence over any previous collection of tasks.
*
* @param newCollection to schedule.
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java
new file mode 100644
index 0000000..005a7ff
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Streams;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.MessageUtils;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.metric.StateTransitionEvent;
+import org.apache.nemo.runtime.common.metric.TaskMetric;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.Task;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Class for simulated task execution.
+ */
+public final class SimulatedTaskExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(SimulatedTaskExecutor.class.getName());
+ private static final String TASK_METRIC_ID = "TaskMetric";
+
+ /**
+ * the simulation scheduler that the executor is associated with.
+ */
+ private final SimulationScheduler scheduler;
+ private final ExecutorRepresenter executorRepresenter;
+ private Long executorInitializationTime;
+ private final AtomicLong currentTime;
+ private Long timeCheckpoint;
+ private final MetricStore actualMetricStore;
+ private final ConcurrentMap<String, DAG<IRVertex, RuntimeEdge<IRVertex>>> stageIDToStageIRDAG;
+
+ /**
+ * Constructor.
+ * @param scheduler the simulation scheduler that the executor is associated with.
+ */
+ SimulatedTaskExecutor(final SimulationScheduler scheduler,
+ final ExecutorRepresenter executorRepresenter,
+ final MetricStore actualMetricStore) {
+ this.scheduler = scheduler;
+ this.executorRepresenter = executorRepresenter;
+ this.actualMetricStore = actualMetricStore;
+ this.stageIDToStageIRDAG = new ConcurrentHashMap<>();
+ this.currentTime = new AtomicLong(-1L);
+ this.executorInitializationTime = -1L;
+ }
+
+ /**
+ * Calculate the expected task duration.
+ * This only works if there exists metrics in the actual MetricStore, that contains information about the stage,
+ * as well as with the parallelism the task is associated with.
+ *
+ * @param task the task to calculate the task duration for.
+ * @return the expected task duration.
+ */
+ private long calculateExpectedTaskDuration(final Task task) {
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> stageIRDAG = stageIDToStageIRDAG.computeIfAbsent(task.getStageId(),
+ i -> SerializationUtils.deserialize(task.getSerializedIRDag()));
+
+ final Map<String, Object> jobMetricMap = this.actualMetricStore.getMetricMap(JobMetric.class);
+ if (jobMetricMap.size() > 1) {
+ LOG.warn("MetricStore has more than one JobMetric. The results could be misleading.");
+ }
+ // Fetch first element.
+ final JobMetric jobMetric = (JobMetric) jobMetricMap.entrySet().iterator().next().getValue();
+ final JsonNode stageDAG = jobMetric.getStageDAG();
+
+ // Gather ID of stages that have the characteristics that the task possesses.
+ final Set<String> stageIdsToGatherMetricsFrom = Streams.stream(() -> stageDAG.get("vertices").iterator())
+ .filter(s -> s.get("properties").get("irDag").get("vertices").size()
+ == stageIRDAG.getVertices().size()) // same # of vertices.
+ .filter(s -> s.get("properties").get("irDag").get("edges").size()
+ == stageIRDAG.getEdges().size()) // same # of edges.
+ .filter(s -> s.get("properties").get("executionProperties")
+ .get("org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty").asInt(0)
+ == task.getPropertyValue(ParallelismProperty.class).orElse(0)) // same parallelism.
+ .map(s -> s.get("id").asText())
+ .collect(Collectors.toSet());
+
+ // Derive the average task duration from the stages.
+ final OptionalDouble average = this.actualMetricStore.getMetricMap(TaskMetric.class).entrySet().stream()
+ .filter(e -> stageIdsToGatherMetricsFrom.contains(RuntimeIdManager.getStageIdFromTaskId(e.getKey())))
+ .map(Map.Entry::getValue) // stream of TaskMetric.
+ .mapToLong(tm -> ((TaskMetric) tm).getTaskDuration())
+ .filter(l -> l > 0)
+ .average();
+
+ // convert to long and save.
+ return (long) (average.orElse(0) + 0.5); // 0 to indicate something went wrong
+ }
+
+ /**
+ * Handle the task and record metrics, as a real Executor#onTaskReceived would.
+ *
+ * @param task the task to execute.
+ */
+ public void onTaskReceived(final Task task) {
+ if (executorInitializationTime < 0) {
+ executorInitializationTime = System.currentTimeMillis();
+ currentTime.set(executorInitializationTime);
+ timeCheckpoint = executorInitializationTime;
+ }
+ final String taskId = task.getTaskId();
+ final int attemptIdx = task.getAttemptIdx();
+ String idOfVertexPutOnHold = null;
+
+ final long schedulingOverhead = System.currentTimeMillis() - this.timeCheckpoint;
+ this.timeCheckpoint = System.currentTimeMillis();
+ this.sendMetric(TASK_METRIC_ID, taskId, "schedulingOverhead",
+ SerializationUtils.serialize(schedulingOverhead));
+ final long executionStartTime = this.currentTime.getAndAdd(schedulingOverhead);
+
+ // Prepare (constructor of TaskExecutor)
+ // Deserialize task
+ // Connect incoming / outgoing edges.
+
+ // Execute
+ LOG.debug("{} started", taskId);
+
+ // Fetch external data (Read) and process them
+ // Finalize vertex and write
+
+ final long expectedTaskDuration = this.calculateExpectedTaskDuration(task);
+ this.currentTime.getAndAdd(expectedTaskDuration);
+
+ this.sendMetric(TASK_METRIC_ID, taskId, "taskDuration",
+ SerializationUtils.serialize(this.currentTime.get() - executionStartTime));
+ this.timeCheckpoint = System.currentTimeMillis();
+ if (idOfVertexPutOnHold == null) {
+ this.onTaskStateChanged(taskId, attemptIdx, TaskState.State.COMPLETE,
+ Optional.empty(), Optional.empty());
+ LOG.debug("{} completed", taskId);
+ } else {
+ this.onTaskStateChanged(taskId, attemptIdx, TaskState.State.ON_HOLD,
+ Optional.of(idOfVertexPutOnHold), Optional.empty());
+ LOG.debug("{} on hold", taskId);
+ }
+ }
+
+ /**
+ *
+ * @return the elapsed time for the executor.
+ */
+ public Long getElapsedTime() {
+ return currentTime.get() - executorInitializationTime;
+ }
+
+ /**
+ * Updates the state of the task.
+ *
+ * @param taskId of the task.
+ * @param attemptIdx of the task.
+ * @param newState of the task.
+ * @param vertexPutOnHold the vertex put on hold.
+ * @param cause only provided as non-empty upon recoverable failures.
+ */
+ private void onTaskStateChanged(final String taskId,
+ final int attemptIdx,
+ final TaskState.State newState,
+ final Optional<String> vertexPutOnHold,
+ final Optional<TaskState.RecoverableTaskFailureCause> cause) {
+ this.sendMetric("TaskMetric", taskId,
+ "stateTransitionEvent", SerializationUtils.serialize(new StateTransitionEvent<>(
+ this.currentTime.get(), null, newState
+ )));
+
+ final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
+ ControlMessage.TaskStateChangedMsg.newBuilder()
+ .setExecutorId(executorRepresenter.getExecutorId())
+ .setTaskId(taskId)
+ .setAttemptIdx(attemptIdx)
+ .setState(MessageUtils.convertState(newState));
+ if (newState == TaskState.State.ON_HOLD && vertexPutOnHold.isPresent()) {
+ msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
+ }
+ cause.ifPresent(c -> msgBuilder.setFailureCause(MessageUtils.convertFailureCause(c)));
+
+ // Send taskStateChangedMsg to master!
+ this.sendControlMessage(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.TaskStateChanged)
+ .setTaskStateChangedMsg(msgBuilder.build())
+ .build());
+ }
+
+ /**
+ * Send the control message to the scheduler, as an executor would. Main handles TaskStateChanged messages.
+ *
+ * @param message control message to send.
+ */
+ private void sendControlMessage(final ControlMessage.Message message) {
+ this.executorRepresenter.sendControlMessage(message);
+ }
+
+ /**
+ * Send the metric to the scheduler, as an executor would. See where it is used in MetricMessageSender#send.
+ *
+ * @param metricType type of metric.
+ * @param metricId id of metric.
+ * @param metricField field of metric.
+ * @param metricValue value of metric.
+ */
+ public void sendMetric(final String metricType, final String metricId,
+ final String metricField, final byte[] metricValue) {
+ this.scheduler.handleMetricMessage(metricType, metricId, metricField, metricValue);
+ }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java
new file mode 100644
index 0000000..42870f6
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.exception.IllegalMessageException;
+import org.apache.nemo.common.exception.SimulationException;
+import org.apache.nemo.common.exception.UnknownExecutionStateException;
+import org.apache.nemo.common.exception.UnrecoverableFailureException;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageSender;
+import org.apache.nemo.runtime.common.message.MessageUtils;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.metric.Metric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PlanRewriter;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.Task;
+import org.apache.nemo.runtime.common.state.StageState;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PlanAppender;
+import org.apache.nemo.runtime.master.PlanStateManager;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
+import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Scheduler for simulating an execution not controlled by the runtime master. This class follows the structure of
+ * {@link BatchScheduler}, so when a change has to be made on BatchScheduler, it also means that it should be
+ * reflected in this class as well.
+ */
+@DriverSide
+@NotThreadSafe
+public final class SimulationScheduler implements Scheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(SimulationScheduler.class.getName());
+
+ /**
+ * Run-time optimizations.
+ */
+ private final PlanRewriter planRewriter;
+
+ /**
+ * Components related to scheduling the given plan. The role of each class can be found in {@link BatchScheduler}.
+ */
+ private TaskDispatcher taskDispatcher;
+ private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
+ private ExecutorRegistry executorRegistry;
+ private PlanStateManager planStateManager;
+
+ /**
+ * Executor service for scheduling message serialization.
+ */
+ private final ExecutorService serializationExecutorService;
+
+ /**
+ * A component that manages data blocks. We actually don't call for the real data in simulations,
+ * so it's just here to prevent errors.
+ */
+ private final BlockManagerMaster blockManagerMaster;
+
+ /**
+ * The actual metric store, from the actual job that's running, which calls for the simulation to occur.
+ */
+ private final MetricStore actualMetricStore;
+ /**
+ * The metric store for the simulation. This adds up the metrics for the simulated task executions.
+ */
+ private MetricStore metricStore;
+ /**
+ * A latch to confirm that the job has finished and all metrics has been flushed.
+ */
+ private CountDownLatch metricCountDownLatch;
+
+ /**
+ * Components that tell how to schedule the given tasks.
+ */
+ private final SchedulingConstraintRegistry schedulingConstraintRegistry;
+ private final SchedulingPolicy schedulingPolicy;
+ /**
+ * String to generate simulated executors from.
+ */
+ private final String resourceSpecificationString;
+ private final String dagDirectory;
+
+ /**
+ * A map from executor ID to the simulated task executor object.
+ */
+ private final Map<String, SimulatedTaskExecutor> simulatedTaskExecutorMap;
+
+ /**
+ * The below variables depend on the submitted plan to execute.
+ */
+ private List<List<Stage>> sortedScheduleGroups;
+
+ @Inject
+ private SimulationScheduler(final PlanRewriter planRewriter,
+ final SchedulingConstraintRegistry schedulingConstraintRegistry,
+ final BlockManagerMaster blockManagerMaster,
+ @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
+ @Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread,
+ @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
+ this.planRewriter = planRewriter;
+ this.blockManagerMaster = blockManagerMaster;
+ this.pendingTaskCollectionPointer = PendingTaskCollectionPointer.newInstance();
+ this.executorRegistry = ExecutorRegistry.newInstance();
+ this.schedulingConstraintRegistry = schedulingConstraintRegistry;
+ this.schedulingPolicy = new SimulationSchedulingPolicy();
+ this.resourceSpecificationString = resourceSpecificationString;
+ this.dagDirectory = dagDirectory;
+ this.planStateManager = PlanStateManager.newInstance(dagDirectory);
+ this.taskDispatcher = TaskDispatcher.newInstance(schedulingConstraintRegistry, schedulingPolicy,
+ pendingTaskCollectionPointer, executorRegistry, planStateManager);
+ this.serializationExecutorService = Executors.newFixedThreadPool(scheduleSerThread);
+ this.actualMetricStore = MetricStore.getStore();
+ this.metricStore = MetricStore.newInstance();
+ this.planStateManager.setMetricStore(this.metricStore);
+ this.simulatedTaskExecutorMap = new HashMap<>();
+ setUpExecutors();
+ }
+
+ /**
+ * Simulate the launch of executors.
+ */
+ private void setUpExecutors() {
+ final List<Pair<Integer, ResourceSpecification>> resourceSpecs =
+ Util.parseResourceSpecificationString(resourceSpecificationString);
+ // Role of ActiveContextHandler + RuntimeMaster.onExecuterLaunched.
+ final AtomicInteger executorIdGenerator = new AtomicInteger(0);
+ final AtomicInteger resourceRequestCount = new AtomicInteger(0);
+ resourceSpecs.forEach(p -> {
+ for (int i = 0; i < p.left(); i++) {
+ resourceRequestCount.getAndIncrement();
+ final ActiveContext ac = new SimulationEvaluatorActiveContext(executorIdGenerator.getAndIncrement());
+ this.onExecutorAdded(new DefaultExecutorRepresenter(ac.getId(), p.right(),
+ new SimulationMessageSender(ac.getId(), this), ac, serializationExecutorService, ac.getId()));
+ }
+ });
+ this.metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
+ }
+
+ /**
+ * Reset the instance to its initial state.
+ */
+ public void reset() {
+ this.terminate();
+ this.executorRegistry = ExecutorRegistry.newInstance();
+ this.planStateManager = PlanStateManager.newInstance(dagDirectory);
+ this.pendingTaskCollectionPointer.getAndSetNull();
+ this.taskDispatcher = TaskDispatcher.newInstance(schedulingConstraintRegistry, schedulingPolicy,
+ pendingTaskCollectionPointer, executorRegistry, planStateManager);
+ this.metricStore = MetricStore.newInstance();
+ this.planStateManager.setMetricStore(metricStore);
+ this.simulatedTaskExecutorMap.clear();
+ setUpExecutors();
+ }
+
+ @VisibleForTesting
+ public PlanStateManager getPlanStateManager() {
+ return planStateManager;
+ }
+
+ /**
+ * The entrance point of the simulator. Simulate a plan by submitting a plan through this method.
+ * @param submittedPhysicalPlan the plan to simulate.
+ * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
+ */
+ @Override
+ public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final int maxScheduleAttempt) {
+ // Execute the given plan.
+ LOG.info("Plan to schedule: {}", submittedPhysicalPlan.getPlanId());
+
+ if (!planStateManager.isInitialized()) {
+ // First scheduling.
+ taskDispatcher.run();
+ updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
+ planStateManager.storeJSON("submitted");
+ } else {
+ // Append the submitted plan to the original plan.
+ final PhysicalPlan appendedPlan =
+ PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
+ updatePlan(appendedPlan, maxScheduleAttempt);
+ planStateManager.storeJSON("appended");
+ }
+
+ doSchedule();
+
+ try {
+ planStateManager.waitUntilFinish();
+ } finally {
+ planStateManager.storeJSON("final");
+ }
+
+ final Long jobDuration = this.simulatedTaskExecutorMap.values().stream()
+ .mapToLong(SimulatedTaskExecutor::getElapsedTime)
+ .max().orElse(0);
+ LOG.info("Simulation of {} is complete with job duration of {}!", submittedPhysicalPlan.getPlanId(), jobDuration);
+ this.metricStore.getOrCreateMetric(JobMetric.class, submittedPhysicalPlan.getPlanId()).setJobDuration(jobDuration);
+ executorRegistry.viewExecutors(executors -> executors.forEach(executor -> metricCountDownLatch.countDown()));
+ }
+
+ /**
+ * The main entry point for task scheduling.
+ * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
+ * <p>
+ * These are the reasons why.
+ * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
+ * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
+ */
+ private void doSchedule() {
+ final java.util.Optional<List<Stage>> earliest =
+ BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager);
+
+ if (earliest.isPresent()) {
+ final List<Task> tasksToSchedule = earliest.get().stream()
+ .flatMap(stage ->
+ BatchSchedulerUtils.selectSchedulableTasks(planStateManager, blockManagerMaster, stage).stream())
+ .collect(Collectors.toList());
+ if (!tasksToSchedule.isEmpty()) {
+ LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+ .map(Task::getTaskId)
+ .map(RuntimeIdManager::getStageIdFromTaskId)
+ .collect(Collectors.toSet()));
+
+ // Set the pointer to the schedulable tasks.
+ pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+
+ // Notify the dispatcher that a new collection is available.
+ taskDispatcher.onNewPendingTaskCollectionAvailable();
+ }
+ } else {
+ LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
+ }
+ }
+
+ @Override
+ public void updatePlan(final PhysicalPlan newPhysicalPlan) {
+ // update the physical plan in the scheduler.
+ // NOTE: what's already been executed is not modified in the new physical plan.
+ // TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
+ updatePlan(newPhysicalPlan, planStateManager.getMaxScheduleAttempt());
+ }
+
+ /**
+ * Update the physical plan in the scheduler.
+ *
+ * @param newPhysicalPlan the new physical plan to update.
+ * @param maxScheduleAttempt the maximum number of task scheduling attempt.
+ */
+ private void updatePlan(final PhysicalPlan newPhysicalPlan,
+ final int maxScheduleAttempt) {
+ planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
+ this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
+ .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+ .entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+ LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
+ executorRegistry.registerExecutor(executorRepresenter);
+ this.simulatedTaskExecutorMap.put(executorRepresenter.getExecutorId(),
+ new SimulatedTaskExecutor(this, executorRepresenter, actualMetricStore));
+ taskDispatcher.onExecutorSlotAvailable();
+ }
+
+ @Override
+ public void onExecutorRemoved(final String executorId) {
+ // Role of FailedEvaluatorHandler + onExecutorFailed.
+ metricCountDownLatch.countDown();
+ LOG.info("{} removed", executorId);
+
+ // These are tasks that were running at the time of executor removal.
+ final Set<String> interruptedTasks = new HashSet<>();
+ executorRegistry.updateExecutor(executorId, (executor, state) -> {
+ interruptedTasks.addAll(executor.onExecutorFailed());
+ return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
+ });
+
+ // Retry the interrupted tasks (and required parents)
+ BatchSchedulerUtils.retryTasksAndRequiredParents(planStateManager, blockManagerMaster, interruptedTasks);
+
+ // Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
+ doSchedule();
+ }
+
+ /**
+ * Process the RuntimePassMessage.
+ *
+ * @param taskId that generated the message.
+ * @param data of the message.
+ */
+ public void onRunTimePassMessage(final String taskId, final Object data) {
+ // TODO #436: Dynamic task resizing.
+ BatchSchedulerUtils.onRunTimePassMessage(planStateManager, planRewriter, taskId, data);
+ }
+
+ @Override
+ public synchronized void onTaskStateReportFromExecutor(final String executorId,
+ final String taskId,
+ final int attemptIdx,
+ final TaskState.State newState,
+ @Nullable final String taskPutOnHold,
+ final TaskState.RecoverableTaskFailureCause failureCause) {
+ // Role of MasterControlMessageReceiver + handleControlMessage --> onTaskStateChanged.
+ // Do change state, as this notification is for the current task attempt.
+ planStateManager.onTaskStateChanged(taskId, newState);
+ switch (newState) {
+ case COMPLETE:
+ BatchSchedulerUtils.onTaskExecutionComplete(executorRegistry, executorId, taskId);
+ break;
+ case SHOULD_RETRY:
+ // SHOULD_RETRY from an executor means that the task ran into a recoverable failure
+ BatchSchedulerUtils.onTaskExecutionFailedRecoverable(planStateManager, blockManagerMaster, executorRegistry,
+ executorId, taskId, failureCause);
+ break;
+ case ON_HOLD:
+ final java.util.Optional<PhysicalPlan> optionalPhysicalPlan =
+ BatchSchedulerUtils
+ .onTaskExecutionOnHold(planStateManager, executorRegistry, planRewriter, executorId, taskId);
+ optionalPhysicalPlan.ifPresent(this::updatePlan);
+ break;
+ case FAILED:
+ throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
+ taskId, executorId)));
+ case READY:
+ case EXECUTING:
+ throw new SimulationException("The states READY/EXECUTING cannot occur at this point");
+ default:
+ throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
+ }
+
+ // Invoke doSchedule()
+ switch (newState) {
+ case COMPLETE:
+ case ON_HOLD:
+ // If the stage has completed
+ final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
+ if (planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)
+ && !planStateManager.isPlanDone()) {
+ doSchedule();
+ }
+ break;
+ case SHOULD_RETRY:
+ // Do retry
+ doSchedule();
+ break;
+ default:
+ break;
+ }
+
+ // Invoke taskDispatcher.onExecutorSlotAvailable()
+ switch (newState) {
+ // These three states mean that a slot is made available.
+ case COMPLETE:
+ case ON_HOLD:
+ case SHOULD_RETRY:
+ taskDispatcher.onExecutorSlotAvailable();
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Handle metric message, as it would have been sent from org.apache.nemo.runtime.executor.MetricManagerWorker#send.
+ * @param metricType type of metric.
+ * @param metricId id of metric.
+ * @param metricField field of metric.
+ * @param metricValue value of metric.
+ */
+ void handleMetricMessage(final String metricType, final String metricId,
+ final String metricField, final byte[] metricValue) {
+ final Class<Metric> metricClass = metricStore.getMetricClassByName(metricType);
+ // process metric message
+ try {
+ if (metricStore.getOrCreateMetric(metricClass, metricId).processMetricMessage(metricField, metricValue)) {
+ metricStore.triggerBroadcast(metricClass, metricId);
+ }
+ } catch (final Exception e) {
+ LOG.warn("Error when processing metric message for {}, {}, {}.", metricType, metricId, metricField);
+ }
+ }
+
+ /**
+ * The endpoint of the simulator. Collect the metric store, and terminate the simulator.
+ * @return the metrics of the simulation.
+ */
+ public MetricStore collectMetricStore() {
+ try {
+ // wait for metric flush
+ if (!metricCountDownLatch.await(10000, TimeUnit.MILLISECONDS)) {
+ LOG.warn("Terminating master before all executor terminated messages arrived.");
+ }
+ } catch (final InterruptedException e) {
+ LOG.warn("Waiting executor terminating process interrupted: ", e);
+ // clean up state...
+ Thread.currentThread().interrupt();
+ }
+
+ final MetricStore res = this.metricStore;
+ this.reset();
+ return res;
+ }
+
+ @Override
+ public void onSpeculativeExecutionCheck() {
+ // we don't simulate speculate execution yet.
+ return;
+ }
+
+ @Override
+ public void terminate() {
+ this.taskDispatcher.terminate();
+ this.executorRegistry.terminate();
+ }
+
+ /**
+ * Evaluator ActiveContext for the Simulation.
+ */
+ private static final class SimulationEvaluatorActiveContext implements ActiveContext {
+ private final Integer id;
+
+ /**
+ * Default constructor.
+ * @param id Evaluator ID.
+ */
+ SimulationEvaluatorActiveContext(final Integer id) {
+ this.id = id;
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConf) {
+ // do nothing
+ }
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+ // do nothing
+ }
+
+ @Override
+ public void submitContextAndService(final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ // do nothing
+ }
+
+ @Override
+ public void sendMessage(final byte[] message) {
+ // do nothing
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return getId();
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return null;
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return null;
+ }
+
+ @Override
+ public String getId() {
+ return "Evaluator" + id;
+ }
+ }
+
+ /**
+ * MessageSender for Simulations.
+ */
+ private static final class SimulationMessageSender implements MessageSender<ControlMessage.Message> {
+ private final String executorId;
+ private final SimulationScheduler scheduler;
+
+ /**
+ * Constructor for the message sender that simply passes on the messages, instead of sending actual messages.
+ * @param executorId the simulated executor id of where the message sender communicates from.
+ * @param scheduler the simulation scheduler to communicate with.
+ */
+ SimulationMessageSender(final String executorId, final SimulationScheduler scheduler) {
+ this.executorId = executorId;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public void send(final ControlMessage.Message message) {
+ switch (message.getType()) {
+ // Messages sent to the master
+ case TaskStateChanged:
+ final ControlMessage.TaskStateChangedMsg taskStateChangedMsg = message.getTaskStateChangedMsg();
+
+ scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
+ taskStateChangedMsg.getTaskId(),
+ taskStateChangedMsg.getAttemptIdx(),
+ MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
+ taskStateChangedMsg.getVertexPutOnHoldId(),
+ MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
+ break;
+ case ExecutorFailed:
+ // Executor failed due to user code.
+ final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
+ final String failedExecutorId = executorFailedMsg.getExecutorId();
+ final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
+ LOG.error(failedExecutorId, " failed, Stack Trace: ", exception);
+ throw new SimulationException(exception);
+ case RunTimePassMessage:
+ scheduler.onRunTimePassMessage(
+ // TODO #436: Dynamic task resizing.
+ message.getRunTimePassMessageMsg().getTaskId(),
+ message.getRunTimePassMessageMsg().getEntryList());
+ break;
+ case MetricMessageReceived:
+ final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
+ metricList.forEach(metric ->
+ scheduler.handleMetricMessage(
+ metric.getMetricType(), metric.getMetricId(),
+ metric.getMetricField(), metric.getMetricValue().toByteArray()));
+ break;
+ // Messages sent to the executor
+ case ScheduleTask:
+ final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
+ final Task task =
+ SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
+ scheduler.simulatedTaskExecutorMap.get(executorId).onTaskReceived(task);
+ break;
+ // No metric messaging in simulation.
+ case MetricFlushed:
+ case RequestMetricFlush:
+ break;
+ default:
+ throw new IllegalMessageException(
+ new Exception("This message should not be received by Master or the Executor :" + message.getType()));
+ }
+ }
+
+ @Override
+ public <U> CompletableFuture<U> request(final ControlMessage.Message message) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ // do nothing.
+ }
+ }
+
+
+ /**
+ * Scheduling policy for simulations.
+ */
+ private final class SimulationSchedulingPolicy implements SchedulingPolicy {
+ @Override
+ public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) {
+ return Collections.min(executors,
+ Comparator.comparing(e -> simulatedTaskExecutorMap.get(e.getExecutorId()).getElapsedTime().intValue()));
+ }
+ }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
index 16a6b35..e15d8ea 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -76,6 +76,24 @@ final class TaskDispatcher {
}
/**
+ * Static constructor for manual usage.
+ * @param schedulingConstraintRegistry Registry for the scheduling constraints.
+ * @param schedulingPolicy The Scheduling Policy to use.
+ * @param pendingTaskCollectionPointer A pointer to the pending tasks to be executed later on.
+ * @param executorRegistry Registry for the list of executors available.
+ * @param planStateManager Manager for the state of the plan being executed.
+ * @return a new instance of task dispatcher.
+ */
+ public static TaskDispatcher newInstance(final SchedulingConstraintRegistry schedulingConstraintRegistry,
+ final SchedulingPolicy schedulingPolicy,
+ final PendingTaskCollectionPointer pendingTaskCollectionPointer,
+ final ExecutorRegistry executorRegistry,
+ final PlanStateManager planStateManager) {
+ return new TaskDispatcher(schedulingConstraintRegistry, schedulingPolicy, pendingTaskCollectionPointer,
+ executorRegistry, planStateManager);
+ }
+
+ /**
* A separate thread is run to dispatch tasks to executors.
* See comments in the {@link Scheduler} for avoiding race conditions.
*/
@@ -115,6 +133,7 @@ final class TaskDispatcher {
executorRegistry.viewExecutors(executors -> {
final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
+ // Filter out the candidate executors that do not meet scheduling constraints.
task.getExecutionProperties().forEachProperties(property -> {
final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass());
if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) {
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java
index d82ddd5..c55c4fc 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java
@@ -23,7 +23,7 @@ import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.master.resource.ContainerManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 0384932..35bb737 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -30,7 +30,7 @@ import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/SimulationSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/SimulationSchedulerTest.java
new file mode 100644
index 0000000..73d9219
--- /dev/null
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/SimulationSchedulerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.message.ClientRPC;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.metric.TaskMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PlanRewriter;
+import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests {@link SimulationScheduler}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SchedulingConstraintRegistry.class, BlockManagerMaster.class, ClientRPC.class, BatchScheduler.class})
+public final class SimulationSchedulerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(SimulationSchedulerTest.class.getName());
+ private BatchScheduler batchScheduler;
+ private SimulationScheduler scheduler;
+ private static final String defaultExecutorJSONContents =
+ "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+ + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
+
+ // Assume no failures
+ private static final int SCHEDULE_ATTEMPT_INDEX = 1;
+
+ @Before
+ public void setUp() throws Exception {
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ injector.bindVolatileInstance(PlanRewriter.class, mock(PlanRewriter.class));
+ injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
+ final SchedulingPolicy schedulingPolicy = Tang.Factory.getTang().newInjector()
+ .getInstance(MinOccupancyFirstSchedulingPolicy.class);
+ injector.bindVolatileInstance(SchedulingPolicy.class, schedulingPolicy);
+ injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
+ injector.bindVolatileInstance(ClientRPC.class, mock(ClientRPC.class));
+ injector.bindVolatileParameter(JobConf.ExecutorJSONContents.class, defaultExecutorJSONContents);
+ injector.bindVolatileParameter(JobConf.ScheduleSerThread.class, 8);
+ injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
+
+ batchScheduler = mock(BatchScheduler.class);
+ scheduler = injector.getInstance(SimulationScheduler.class);
+ }
+
+ @Test(timeout = 10000)
+ public void testSimulation() throws Exception {
+ final PhysicalPlan physicalPlan =
+ TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
+ MetricStore.getStore().getOrCreateMetric(JobMetric.class, physicalPlan.getPlanId())
+ .setStageDAG(physicalPlan.getStageDAG());
+ final PhysicalPlan physicalPlan2 =
+ TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
+ scheduler.schedulePlan(
+ physicalPlan2,
+ SCHEDULE_ATTEMPT_INDEX);
+
+ LOG.info("Wait for plan termination after sending stage completion events");
+ final MetricStore resultingMetricStore = scheduler.collectMetricStore();
+ scheduler.terminate();
+
+ resultingMetricStore.getMetricMap(TaskMetric.class).forEach((id, taskMetric) -> {
+ assertTrue(0 <= ((TaskMetric) taskMetric).getTaskDuration());
+ assertTrue(1000 > ((TaskMetric) taskMetric).getTaskDuration());
+ });
+
+ final LongStream l = resultingMetricStore.getMetricMap(JobMetric.class).values().stream().mapToLong(jobMetric ->
+ ((JobMetric) jobMetric).getJobDuration());
+ LOG.info("Simulated duration: {}ms", l.findFirst().orElse(0));
+ }
+}
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index 2ee2d2d..fbaae0b 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -37,7 +37,7 @@ import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.tang.Injector;
import org.junit.Before;