You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2020/03/09 06:14:24 UTC

[incubator-nemo] branch master updated: [NEMO-438] Create a Simulator for Simulating an Execution of an Execution Plan (#288)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dd3ef0  [NEMO-438] Create a Simulator for Simulating an Execution of an Execution Plan (#288)
4dd3ef0 is described below

commit 4dd3ef0a8a4fb4407fdca2c470f4705f24d161f5
Author: Won Wook SONG <ws...@gmail.com>
AuthorDate: Mon Mar 9 15:14:13 2020 +0900

    [NEMO-438] Create a Simulator for Simulating an Execution of an Execution Plan (#288)
    
    JIRA: [NEMO-438: Create a Simulator for Simulating an Execution of an Execution Plan](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-438)
    
    **Major changes:**
    - Implements a SimulationScheduler that schedules a physical plan
    - Implements a SimulatedExecutor that acts like an executor
    - Refactor components used by both `BatchScheduler` and `SimulationScheduler` to `SchedulerUtils`
    
    **Minor changes to note:**
    - Refactors executor's ResourceSpecification to be able to be accessed in the driver as well
    - Enables IRDAG to get the input size of the workload
    - Added Job Duration to the Job Metric.
    
    **Tests for the changes:**
    - SimulationSchedulerTest.java tests for the implementations
    
    **Other comments:**
    - None
    
    Closes #288
---
 .../src/main/java/org/apache/nemo/common/Util.java |  57 +-
 .../nemo/common/exception/SimulationException.java |  45 ++
 .../main/java/org/apache/nemo/common/ir/IRDAG.java |  48 ++
 .../java/org/apache/nemo/common/ir/IdManager.java  |  11 +
 .../executionproperty}/ResourceSpecification.java  |   6 +-
 .../nemo/compiler/optimizer/NemoOptimizer.java     |   6 +
 .../nemo/runtime/common/RuntimeIdManager.java      |  10 -
 .../nemo/runtime/common/message/MessageUtils.java  | 103 ++++
 .../nemo/runtime/common/metric/JobMetric.java      |   1 +
 .../org/apache/nemo/runtime/common/plan/Task.java  |   7 +
 .../apache/nemo/driver/UserApplicationRunner.java  |   6 +
 .../org/apache/nemo/runtime/executor/Executor.java |   5 +-
 .../nemo/runtime/executor/TaskStateManager.java    |  51 +-
 .../nemo/runtime/executor/task/TaskExecutor.java   |   2 +
 .../nemo/runtime/master/PlanStateManager.java      |  33 +-
 .../apache/nemo/runtime/master/RuntimeMaster.java  |  90 +--
 .../nemo/runtime/master/metric/MetricStore.java    |   8 +
 .../runtime/master/resource/ContainerManager.java  |   1 +
 .../resource/DefaultExecutorRepresenter.java       |   2 +-
 .../runtime/master/scheduler/BatchScheduler.java   | 341 ++----------
 .../master/scheduler/BatchSchedulerUtils.java      | 361 ++++++++++++
 .../runtime/master/scheduler/ExecutorRegistry.java |   8 +
 .../scheduler/PendingTaskCollectionPointer.java    |   8 +
 .../master/scheduler/SimulatedTaskExecutor.java    | 243 ++++++++
 .../master/scheduler/SimulationScheduler.java      | 611 +++++++++++++++++++++
 .../runtime/master/scheduler/TaskDispatcher.java   |  19 +
 .../nemo/runtime/master/ContainerManagerTest.java  |   2 +-
 .../master/scheduler/BatchSchedulerTest.java       |   2 +-
 .../master/scheduler/SimulationSchedulerTest.java  | 105 ++++
 .../runtime/master/scheduler/TaskRetryTest.java    |   2 +-
 30 files changed, 1740 insertions(+), 454 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/Util.java b/common/src/main/java/org/apache/nemo/common/Util.java
index 2b9e259..4bf4593 100644
--- a/common/src/main/java/org/apache/nemo/common/Util.java
+++ b/common/src/main/java/org/apache/nemo/common/Util.java
@@ -18,9 +18,14 @@
  */
 package org.apache.nemo.common;
 
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nemo.common.exception.JsonParseException;
 import org.apache.nemo.common.exception.MetricException;
 import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
 import org.apache.nemo.common.ir.vertex.utility.TriggerVertex;
@@ -32,8 +37,7 @@ import java.lang.instrument.Instrumentation;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.Optional;
+import java.util.*;
 import java.util.function.IntPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -232,6 +236,55 @@ public final class Util {
     return "edge" + numericId;
   }
 
+
+  /**
+   * Utility method for parsing the resource specification string.
+   *
+   * @param resourceSpecificationString the input resource specification string.
+   * @return the parsed list of resource specifications. The Integer indicates how many of the specified nodes are
+   * required, followed by the ResourceSpecification that indicates the specifications of the nodes.
+   */
+  public static List<Pair<Integer, ResourceSpecification>> parseResourceSpecificationString(
+    final String resourceSpecificationString) {
+    final List<Pair<Integer, ResourceSpecification>> resourceSpecifications = new ArrayList<>();
+    try {
+      if (resourceSpecificationString.trim().startsWith("[")) {
+        final TreeNode jsonRootNode = new ObjectMapper().readTree(resourceSpecificationString);
+
+        for (int i = 0; i < jsonRootNode.size(); i++) {
+          final TreeNode resourceNode = jsonRootNode.get(i);
+          final int executorNum = resourceNode.path("num").traverse().nextIntValue(1);
+          final String type = resourceNode.get("type").traverse().nextTextValue();
+          final int capacity = resourceNode.get("capacity").traverse().getIntValue();
+          final int memory = resourceNode.get("memory_mb").traverse().getIntValue();
+          final OptionalDouble maxOffheapRatio;
+          final OptionalInt poisonSec;
+
+          if (resourceNode.path("max_offheap_ratio").traverse().nextToken() == JsonToken.VALUE_NUMBER_FLOAT) {
+            maxOffheapRatio = OptionalDouble.of(resourceNode.path("max_offheap_ratio").traverse().getDoubleValue());
+          } else {
+            maxOffheapRatio = OptionalDouble.empty();
+          }
+
+          if (resourceNode.path("poison_sec").traverse().nextToken() == JsonToken.VALUE_NUMBER_INT) {
+            poisonSec = OptionalInt.of(resourceNode.path("poison_sec").traverse().getIntValue());
+          } else {
+            poisonSec = OptionalInt.empty();
+          }
+
+          resourceSpecifications.add(
+            Pair.of(executorNum, new ResourceSpecification(type, capacity, memory, maxOffheapRatio, poisonSec)));
+        }
+      } else {
+        throw new UnsupportedOperationException("Executor Info file should be a JSON file.");
+      }
+
+      return resourceSpecifications;
+    } catch (final Exception e) {
+      throw new JsonParseException(e);
+    }
+  }
+
   /**
    * Method for the instrumentation: for getting the object size.
    *
diff --git a/common/src/main/java/org/apache/nemo/common/exception/SimulationException.java b/common/src/main/java/org/apache/nemo/common/exception/SimulationException.java
new file mode 100644
index 0000000..587d582
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/exception/SimulationException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.common.exception;
+
+/**
+ * SimulationException.
+ * Thrown when any exception occurs while trying to simulate
+ * a {org.apache.nemo.runtime.common.plan.physical.Task} to an executor.
+ */
+public final class SimulationException extends RuntimeException {
+  /**
+   * SimulationException.
+   *
+   * @param exception exception
+   */
+  public SimulationException(final Throwable exception) {
+    super(exception);
+  }
+
+  /**
+   * SimulationException.
+   *
+   * @param exception exception
+   */
+  public SimulationException(final String exception) {
+    super(exception);
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index bcb76dd..c95bcf0 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -20,6 +20,7 @@ package org.apache.nemo.common.ir;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Sets;
+import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.PairKeyExtractor;
 import org.apache.nemo.common.Util;
 import org.apache.nemo.common.coder.BytesDecoderFactory;
@@ -28,10 +29,13 @@ import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.dag.DAGInterface;
 import org.apache.nemo.common.exception.CompileTimeOptimizationException;
+import org.apache.nemo.common.exception.MetricException;
 import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.LoopVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
@@ -78,6 +82,11 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
   private final Map<IRVertex, Set<IRVertex>> messageVertexToGroup;
 
   /**
+   * To remember the specifications of the executors used to run the IR DAG with.
+   */
+  private final List<Pair<Integer, ResourceSpecification>> executorInfo;
+
+  /**
    * @param originalUserApplicationDAG the initial DAG.
    */
   public IRDAG(final DAG<IRVertex, IREdge> originalUserApplicationDAG) {
@@ -86,6 +95,7 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
     this.streamVertexToOriginalEdge = new HashMap<>();
     this.samplingVertexToGroup = new HashMap<>();
     this.messageVertexToGroup = new HashMap<>();
+    this.executorInfo = new ArrayList<>();
   }
 
   public IRDAGChecker.CheckerResult checkIntegrity() {
@@ -113,6 +123,12 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
    * @return a IR DAG summary string, consisting of only the vertices generated from the frontend.
    */
   public String irDAGSummary() {
+    final Long inputBytes = this.getInputSize();
+    final String inputSizeString = inputBytes < 1024 ? inputBytes + "B"
+      : (inputBytes / 1024 < 1024 ? inputBytes / 1024 + "KB"
+      : (inputBytes / 1048576 < 1024 ? inputBytes / 1048576 + "MB"
+      : (inputBytes / 1073741824L < 1024 ? inputBytes / 1073741824L + "GB"
+      : inputBytes / 1099511627776L + "TB")));
     return "rv" + getRootVertices().size()
       + "_v" + getVertices().stream()
       .filter(v -> !v.isUtilityVertex())  // Exclude utility vertices
@@ -120,9 +136,41 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
       + "_e" + getVertices().stream()
       .filter(v -> !v.isUtilityVertex())  // Exclude utility vertices
       .mapToInt(v -> getIncomingEdgesOf(v).size())
+      .sum() + "_" + inputSizeString;
+  }
+
+  /**
+   * @return the total sum of the input size for the IR DAG.
+   */
+  public Long getInputSize() {
+    return this.getRootVertices().stream()
+      .filter(irVertex -> irVertex instanceof SourceVertex)
+      .mapToLong(srcVertex -> {
+        try {
+          return ((SourceVertex) srcVertex).getEstimatedSizeBytes();
+        } catch (Exception e) {
+          throw new MetricException(e);
+        }
+      })
       .sum();
   }
 
+  /**
+   * Setter for the executor specifications information.
+   * @param parsedExecutorInfo executor information parsed for processing.
+   */
+  public void recordExecutorInfo(final List<Pair<Integer, ResourceSpecification>> parsedExecutorInfo) {
+    executorInfo.addAll(parsedExecutorInfo);
+  }
+
+  /**
+   * Getter for the executor specifications information.
+   * @return the executor specifications information.
+   */
+  public List<Pair<Integer, ResourceSpecification>> getExecutorInfo() {
+    return executorInfo;
+  }
+
   ////////////////////////////////////////////////// Methods for reshaping the DAG topology.
 
   /**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IdManager.java b/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
index 1aad410..2ff5255 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * ID manager.
@@ -38,6 +39,7 @@ public final class IdManager {
 
   private static AtomicInteger vertexId = new AtomicInteger(1);
   private static AtomicInteger edgeId = new AtomicInteger(1);
+  private static AtomicLong resourceSpecIdGenerator = new AtomicLong(0);
   private static volatile boolean isDriver = false;
 
   // Vertex ID Map to be used upon cloning in loop vertices.
@@ -89,6 +91,15 @@ public final class IdManager {
   }
 
   /**
+   * Generates the ID for a resource specification.
+   *
+   * @return the generated ID
+   */
+  public static String generateResourceSpecId() {
+    return "ResourceSpec" + resourceSpecIdGenerator.getAndIncrement();
+  }
+
+  /**
    * Set the realm of the loaded class as REEF driver.
    */
   public static void setInDriver() {
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ResourceSpecification.java b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ResourceSpecification.java
similarity index 94%
rename from runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ResourceSpecification.java
rename to common/src/main/java/org/apache/nemo/common/ir/executionproperty/ResourceSpecification.java
index f3f0e7e..334ffba 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ResourceSpecification.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ResourceSpecification.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.master.resource;
+package org.apache.nemo.common.ir.executionproperty;
 
-import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.common.ir.IdManager;
 
 import java.util.OptionalDouble;
 import java.util.OptionalInt;
@@ -46,7 +46,7 @@ public final class ResourceSpecification {
                                final int memory,
                                final OptionalDouble maxOffheapRatio,
                                final OptionalInt poisonSec) {
-    this.resourceSpecId = RuntimeIdManager.generateResourceSpecId();
+    this.resourceSpecId = IdManager.generateResourceSpecId();
     this.containerType = containerType;
     this.capacity = capacity;
     this.memory = memory;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 8c0da96..0dc9fe9 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.optimizer;
 
 import net.jcip.annotations.NotThreadSafe;
+import org.apache.nemo.common.Util;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.exception.CompileTimeOptimizationException;
 import org.apache.nemo.common.ir.IRDAG;
@@ -50,6 +51,7 @@ public final class NemoOptimizer implements Optimizer {
   private final String dagDirectory;
   private final Policy optimizationPolicy;
   private final String environmentTypeStr;
+  private final String executorInfoContents;
   private final ClientRPC clientRPC;
 
   private final Map<UUID, Integer> cacheIdToParallelism = new HashMap<>();
@@ -60,15 +62,18 @@ public final class NemoOptimizer implements Optimizer {
    * @param dagDirectory       to store JSON representation of intermediate DAGs.
    * @param policyName         the name of the optimization policy.
    * @param environmentTypeStr the environment type of the workload to optimize the DAG for.
+   * @param executorInfoContents the string of the information of the executors provided.
    * @param clientRPC          the RPC channel to communicate with the client.
    */
   @Inject
   private NemoOptimizer(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
                         @Parameter(JobConf.OptimizationPolicy.class) final String policyName,
                         @Parameter(JobConf.EnvironmentType.class) final String environmentTypeStr,
+                        @Parameter(JobConf.ExecutorJSONContents.class) final String executorInfoContents,
                         final ClientRPC clientRPC) {
     this.dagDirectory = dagDirectory;
     this.environmentTypeStr = OptimizerUtils.filterEnvironmentTypeString(environmentTypeStr);
+    this.executorInfoContents = executorInfoContents;
     this.clientRPC = clientRPC;
 
     try {
@@ -133,6 +138,7 @@ public final class NemoOptimizer implements Optimizer {
    * @param policy the optimization policy to optimize the DAG with.
    */
   private void beforeCompileTimeOptimization(final IRDAG dag, final Policy policy) {
+    dag.recordExecutorInfo(Util.parseResourceSpecificationString(this.executorInfoContents));
     if (policy instanceof XGBoostPolicy) {
       clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
         .setType(ControlMessage.DriverToClientMessageType.LaunchOptimization)
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java
index bfb7bda..255cca7 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/RuntimeIdManager.java
@@ -28,7 +28,6 @@ public final class RuntimeIdManager {
   private static AtomicInteger physicalPlanIdGenerator = new AtomicInteger(0);
   private static AtomicInteger executorIdGenerator = new AtomicInteger(0);
   private static AtomicLong messageIdGenerator = new AtomicLong(1L);
-  private static AtomicLong resourceSpecIdGenerator = new AtomicLong(0);
   private static final String SPLITTER = "-";
 
   /**
@@ -123,15 +122,6 @@ public final class RuntimeIdManager {
     return messageIdGenerator.getAndIncrement();
   }
 
-  /**
-   * Generates the ID for a resource specification.
-   *
-   * @return the generated ID
-   */
-  public static String generateResourceSpecId() {
-    return "ResourceSpec" + resourceSpecIdGenerator.getAndIncrement();
-  }
-
   //////////////////////////////////////////////////////////////// Parse IDs
 
   /**
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageUtils.java
new file mode 100644
index 0000000..241705c
--- /dev/null
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.common.message;
+
+import org.apache.nemo.common.exception.UnknownExecutionStateException;
+import org.apache.nemo.common.exception.UnknownFailureCauseException;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.state.TaskState;
+
+import static org.apache.nemo.runtime.common.state.TaskState.State.COMPLETE;
+import static org.apache.nemo.runtime.common.state.TaskState.State.ON_HOLD;
+
+/**
+ * Utility class for messages.
+ */
+public final class MessageUtils {
+  /**
+   * Private constructor for utility class.
+   */
+  private MessageUtils() {
+  }
+
+  public static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
+    switch (state) {
+      case READY:
+        return TaskState.State.READY;
+      case EXECUTING:
+        return TaskState.State.EXECUTING;
+      case COMPLETE:
+        return COMPLETE;
+      case FAILED_RECOVERABLE:
+        return TaskState.State.SHOULD_RETRY;
+      case FAILED_UNRECOVERABLE:
+        return TaskState.State.FAILED;
+      case ON_HOLD:
+        return ON_HOLD;
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+    }
+  }
+
+  public static ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
+    switch (state) {
+      case READY:
+        return ControlMessage.TaskStateFromExecutor.READY;
+      case EXECUTING:
+        return ControlMessage.TaskStateFromExecutor.EXECUTING;
+      case COMPLETE:
+        return ControlMessage.TaskStateFromExecutor.COMPLETE;
+      case SHOULD_RETRY:
+        return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
+      case FAILED:
+        return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
+      case ON_HOLD:
+        return ControlMessage.TaskStateFromExecutor.ON_HOLD;
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+    }
+  }
+
+  public static TaskState.RecoverableTaskFailureCause convertFailureCause(
+    final ControlMessage.RecoverableFailureCause cause) {
+    switch (cause) {
+      case InputReadFailure:
+        return TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE;
+      case OutputWriteFailure:
+        return TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE;
+      default:
+        throw new UnknownFailureCauseException(
+          new Throwable("The failure cause for the recoverable failure is unknown"));
+    }
+  }
+
+  public static ControlMessage.RecoverableFailureCause convertFailureCause(
+    final TaskState.RecoverableTaskFailureCause cause) {
+    switch (cause) {
+      case INPUT_READ_FAILURE:
+        return ControlMessage.RecoverableFailureCause.InputReadFailure;
+      case OUTPUT_WRITE_FAILURE:
+        return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
+      default:
+        throw new UnknownFailureCauseException(
+          new Throwable("The failure cause for the recoverable failure is unknown"));
+    }
+  }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
index 07ff84f..8725e4b 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
@@ -46,6 +46,7 @@ public final class JobMetric implements StateMetric<PlanState.State> {
   private String vertexProperties;
   private String edgeProperties;
   private JsonNode irDagJson;
+  private volatile DAG<Stage, StageEdge> stageDAG;
   private JsonNode stageDagJson;
   private Long jobDuration;
 
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java
index 737b8a5..719075b 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Task.java
@@ -89,6 +89,13 @@ public final class Task implements Serializable {
   }
 
   /**
+   * @return the Stage ID of the task.
+   */
+  public String getStageId() {
+    return RuntimeIdManager.getStageIdFromTaskId(this.getTaskId());
+  }
+
+  /**
    * @return the incoming edges of the task.
    */
   public List<StageEdge> getTaskIncomingEdges() {
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
index bf759a4..b3ac05b 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
@@ -25,10 +25,12 @@ import org.apache.nemo.compiler.backend.Backend;
 import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter;
 import org.apache.nemo.compiler.optimizer.Optimizer;
 import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.metric.JobMetric;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.plan.PlanRewriter;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.RuntimeMaster;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@ public final class UserApplicationRunner {
    */
   public synchronized void run(final String dagString) {
     try {
+      final long startTime = System.currentTimeMillis();
       LOG.info("##### Nemo Compiler Start #####");
 
       final IRDAG dag = SerializationUtils.deserialize(Base64.getDecoder().decode(dagString));
@@ -104,7 +107,10 @@ public final class UserApplicationRunner {
         planStateManager.storeJSON("final");
       }
 
+      final long endTime = System.currentTimeMillis();
       LOG.info("{} is complete!", physicalPlan.getPlanId());
+      MetricStore.getStore().getOrCreateMetric(JobMetric.class, physicalPlan.getPlanId())
+        .setJobDuration(endTime - startTime);
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index 4ef4c98..75c2e43 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -141,13 +141,12 @@ public final class Executor {
         getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
         e.getPropertyValue(CompressionProperty.class).orElse(null),
         e.getPropertyValue(DecompressionProperty.class).orElse(null)));
-      irDag.getVertices().forEach(v -> {
+      irDag.getVertices().forEach(v ->
         irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
           getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
           getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
           e.getPropertyValue(CompressionProperty.class).orElse(null),
-          e.getPropertyValue(DecompressionProperty.class).orElse(null)));
-      });
+          e.getPropertyValue(DecompressionProperty.class).orElse(null))));
 
       new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
         metricMessageSender, persistentConnectionToMasterMap).execute();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java
index 37b48d4..41f5d6e 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TaskStateManager.java
@@ -19,11 +19,10 @@
 package org.apache.nemo.runtime.executor;
 
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.nemo.common.exception.UnknownExecutionStateException;
-import org.apache.nemo.common.exception.UnknownFailureCauseException;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.MessageUtils;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import org.apache.nemo.runtime.common.metric.StateTransitionEvent;
 import org.apache.nemo.runtime.common.plan.Task;
@@ -74,10 +73,8 @@ public final class TaskStateManager {
   public synchronized void onTaskStateChanged(final TaskState.State newState,
                                               final Optional<String> vertexPutOnHold,
                                               final Optional<TaskState.RecoverableTaskFailureCause> cause) {
-    metricMessageSender.send("TaskMetric", taskId, "stateTransitionEvent",
-      SerializationUtils.serialize(new StateTransitionEvent<>(
-        System.currentTimeMillis(), null, newState
-      )));
+    metricMessageSender.send(METRIC, taskId, "stateTransitionEvent",
+      SerializationUtils.serialize(new StateTransitionEvent<>(System.currentTimeMillis(), null, newState)));
 
     switch (newState) {
       case EXECUTING:
@@ -119,13 +116,9 @@ public final class TaskStateManager {
         .setExecutorId(executorId)
         .setTaskId(taskId)
         .setAttemptIdx(attemptIdx)
-        .setState(convertState(newState));
-    if (vertexPutOnHold.isPresent()) {
-      msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
-    }
-    if (cause.isPresent()) {
-      msgBuilder.setFailureCause(convertFailureCause(cause.get()));
-    }
+        .setState(MessageUtils.convertState(newState));
+    vertexPutOnHold.ifPresent(msgBuilder::setVertexPutOnHoldId);
+    cause.ifPresent(c -> msgBuilder.setFailureCause(MessageUtils.convertFailureCause(c)));
 
     // Send taskStateChangedMsg to master!
     persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
@@ -137,38 +130,6 @@ public final class TaskStateManager {
         .build());
   }
 
-  private ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
-    switch (state) {
-      case READY:
-        return ControlMessage.TaskStateFromExecutor.READY;
-      case EXECUTING:
-        return ControlMessage.TaskStateFromExecutor.EXECUTING;
-      case COMPLETE:
-        return ControlMessage.TaskStateFromExecutor.COMPLETE;
-      case SHOULD_RETRY:
-        return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
-      case FAILED:
-        return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
-      case ON_HOLD:
-        return ControlMessage.TaskStateFromExecutor.ON_HOLD;
-      default:
-        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
-    }
-  }
-
-  private ControlMessage.RecoverableFailureCause convertFailureCause(
-    final TaskState.RecoverableTaskFailureCause cause) {
-    switch (cause) {
-      case INPUT_READ_FAILURE:
-        return ControlMessage.RecoverableFailureCause.InputReadFailure;
-      case OUTPUT_WRITE_FAILURE:
-        return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
-      default:
-        throw new UnknownFailureCauseException(
-          new Throwable("The failure cause for the recoverable failure is unknown"));
-    }
-  }
-
   // Tentative
   public void getCurrentTaskExecutionState() {
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 4d7890c..6e347d8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -121,6 +121,7 @@ public final class TaskExecutor {
     final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory);
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
+
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
@@ -356,6 +357,7 @@ public final class TaskExecutor {
     for (final VertexHarness vertexHarness : sortedHarnesses) {
       finalizeVertex(vertexHarness);
     }
+
     metricMessageSender.send(TASK_METRIC_ID, taskId, "taskDuration",
       SerializationUtils.serialize(System.currentTimeMillis() - executionStartTime));
     this.timeSinceLastExecution = System.currentTimeMillis();
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
index 1a033b4..5841adb 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
@@ -118,6 +118,22 @@ public final class PlanStateManager {
   }
 
   /**
+   * Static constructor for manual usage.
+   * @param dagDirectory the DAG directory to store the JSON to.
+   * @return a new PlanStateManager instance.
+   */
+  public static PlanStateManager newInstance(final String dagDirectory) {
+    return new PlanStateManager(dagDirectory);
+  }
+
+  /**
+   * @param metricStore set the metric store of the paln state manager.
+   */
+  public void setMetricStore(final MetricStore metricStore) {
+    this.metricStore = metricStore;
+  }
+
+  /**
    * Update the physical plan and maximum attempt.
    *
    * @param physicalPlanToUpdate    the physical plan to manage.
@@ -147,16 +163,13 @@ public final class PlanStateManager {
   private void initializeStates() {
     onPlanStateChanged(PlanState.State.EXECUTING);
     physicalPlan.getStageDAG().topologicalDo(stage -> {
-      if (!stageIdToState.containsKey(stage.getId())) {
-        stageIdToState.put(stage.getId(), new StageState());
-        stageIdToTaskIdxToAttemptStates.put(stage.getId(), new HashMap<>());
-
-        // for each task idx of this stage
-        for (final int taskIndex : stage.getTaskIndices()) {
-          stageIdToTaskIdxToAttemptStates.get(stage.getId()).put(taskIndex, new ArrayList<>());
-          // task states will be initialized lazily in getTaskAttemptsToSchedule()
-        }
-      }
+      stageIdToState.putIfAbsent(stage.getId(), new StageState());
+      stageIdToTaskIdxToAttemptStates.putIfAbsent(stage.getId(), new HashMap<>());
+
+      // for each task idx of this stage
+      stage.getTaskIndices().forEach(taskIndex ->
+        stageIdToTaskIdxToAttemptStates.get(stage.getId()).putIfAbsent(taskIndex, new ArrayList<>()));
+        // task states will be initialized lazily in getTaskAttemptsToSchedule()
     });
   }
 
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
index 45a9cf0..99ee8e5 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
@@ -18,31 +18,28 @@
  */
 package org.apache.nemo.runtime.master;
 
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.ByteString;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.nemo.common.Pair;
-import org.apache.nemo.common.exception.*;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.exception.ContainerException;
+import org.apache.nemo.common.exception.IllegalMessageException;
+import org.apache.nemo.common.exception.MetricException;
 import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
-import org.apache.nemo.runtime.common.message.ClientRPC;
-import org.apache.nemo.runtime.common.message.MessageContext;
-import org.apache.nemo.runtime.common.message.MessageEnvironment;
-import org.apache.nemo.runtime.common.message.MessageListener;
+import org.apache.nemo.runtime.common.message.*;
 import org.apache.nemo.runtime.common.metric.JobMetric;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
-import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
 import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.nemo.runtime.master.resource.ContainerManager;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
 import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
 import org.apache.nemo.runtime.master.scheduler.Scheduler;
 import org.apache.nemo.runtime.master.servlet.*;
@@ -60,13 +57,13 @@ import org.slf4j.LoggerFactory;
 import javax.inject.Inject;
 import java.io.Serializable;
 import java.nio.file.Paths;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.nemo.runtime.common.state.TaskState.State.COMPLETE;
-import static org.apache.nemo.runtime.common.state.TaskState.State.ON_HOLD;
-
 /**
  * (WARNING) Use runtimeMasterThread for all public methods to avoid race conditions.
  * See comments in the {@link Scheduler} for avoiding race conditions.
@@ -305,33 +302,14 @@ public final class RuntimeMaster {
   public void requestContainer(final String resourceSpecificationString) {
     final Future<?> containerRequestEventResult = runtimeMasterThread.submit(() -> {
       try {
-        final TreeNode jsonRootNode = objectMapper.readTree(resourceSpecificationString);
-
-        for (int i = 0; i < jsonRootNode.size(); i++) {
-          final TreeNode resourceNode = jsonRootNode.get(i);
-          final String type = resourceNode.get("type").traverse().nextTextValue();
-          final int memory = resourceNode.get("memory_mb").traverse().getIntValue();
-          final OptionalDouble maxOffheapRatio;
-          final int capacity = resourceNode.get("capacity").traverse().getIntValue();
-          final int executorNum = resourceNode.path("num").traverse().nextIntValue(1);
-          final OptionalInt poisonSec;
-
-          if (resourceNode.path("max_offheap_ratio").traverse().nextToken() == JsonToken.VALUE_NUMBER_FLOAT) {
-            maxOffheapRatio = OptionalDouble.of(resourceNode.path("max_offheap_ratio").traverse().getDoubleValue());
-          } else {
-            maxOffheapRatio = OptionalDouble.empty();
-          }
-
-          if (resourceNode.path("poison_sec").traverse().nextToken() == JsonToken.VALUE_NUMBER_INT) {
-            poisonSec = OptionalInt.of(resourceNode.path("poison_sec").traverse().getIntValue());
-          } else {
-            poisonSec = OptionalInt.empty();
-          }
+        final List<Pair<Integer, ResourceSpecification>> resourceSpecificationList =  // pair of (# of executors, specs)
+          Util.parseResourceSpecificationString(resourceSpecificationString);
 
-          resourceRequestCount.getAndAdd(executorNum);
-          containerManager.requestContainer(executorNum, new ResourceSpecification(type, capacity, memory,
-            maxOffheapRatio, poisonSec));
+        for (final Pair<Integer, ResourceSpecification> resourceSpecification: resourceSpecificationList) {
+          resourceRequestCount.getAndAdd(resourceSpecification.left());
+          containerManager.requestContainer(resourceSpecification.left(), resourceSpecification.right());
         }
+
         metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
       } catch (final Exception e) {
         throw new ContainerException(e);
@@ -454,9 +432,9 @@ public final class RuntimeMaster {
         scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
           taskStateChangedMsg.getTaskId(),
           taskStateChangedMsg.getAttemptIdx(),
-          convertTaskState(taskStateChangedMsg.getState()),
+          MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
           taskStateChangedMsg.getVertexPutOnHoldId(),
-          convertFailureCause(taskStateChangedMsg.getFailureCause()));
+          MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
         break;
       case ExecutorFailed:
         // Executor failed due to user code.
@@ -493,38 +471,6 @@ public final class RuntimeMaster {
     }
   }
 
-  private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
-    switch (state) {
-      case READY:
-        return TaskState.State.READY;
-      case EXECUTING:
-        return TaskState.State.EXECUTING;
-      case COMPLETE:
-        return COMPLETE;
-      case FAILED_RECOVERABLE:
-        return TaskState.State.SHOULD_RETRY;
-      case FAILED_UNRECOVERABLE:
-        return TaskState.State.FAILED;
-      case ON_HOLD:
-        return ON_HOLD;
-      default:
-        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
-    }
-  }
-
-  private TaskState.RecoverableTaskFailureCause convertFailureCause(
-    final ControlMessage.RecoverableFailureCause cause) {
-    switch (cause) {
-      case InputReadFailure:
-        return TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE;
-      case OutputWriteFailure:
-        return TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE;
-      default:
-        throw new UnknownFailureCauseException(
-          new Throwable("The failure cause for the recoverable failure is unknown"));
-    }
-  }
-
   /**
    * Schedules a periodic DAG logging thread.
    * TODO #20: RESTful APIs to Access Job State and Metric.
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
index 3d3db4f..53477bc 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
@@ -79,6 +79,14 @@ public final class MetricStore {
   }
 
   /**
+   * Static class for creating a new instance.
+   * @return a new MetricStore instance.
+   */
+  public static MetricStore newInstance() {
+    return new MetricStore();
+  }
+
+  /**
    * Get the metric class by its name.
    *
    * @param className the name of the class.
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
index 40a2210..defd170 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.runtime.master.resource;
 
 import org.apache.nemo.common.exception.ContainerException;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.message.FailedMessageSender;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
index 04e9051..16c9a70 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.protobuf.ByteString;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -120,7 +121,6 @@ public final class DefaultExecutorRepresenter implements ExecutorRepresenter {
     runningTaskToAttempt.put(task, task.getAttemptIdx());
     failedTasks.remove(task);
 
-
     serializationExecutorService.execute(() -> {
       final byte[] serialized = SerializationUtils.serialize(task);
       sendControlMessage(
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index 4c3d112..cf8a123 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -18,21 +18,13 @@
  */
 package org.apache.nemo.runtime.master.scheduler;
 
-import com.google.common.collect.Sets;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.nemo.common.Pair;
-import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.exception.UnknownExecutionStateException;
-import org.apache.nemo.common.exception.UnknownFailureCauseException;
 import org.apache.nemo.common.exception.UnrecoverableFailureException;
-import org.apache.nemo.common.ir.Readable;
-import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.*;
-import org.apache.nemo.runtime.common.state.BlockState;
 import org.apache.nemo.runtime.common.state.StageState;
 import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
@@ -47,7 +39,6 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -55,6 +46,8 @@ import java.util.stream.Collectors;
  * (i.e., runtimeMasterThread in RuntimeMaster)
  * <p>
  * BatchScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
+ *
+ * Note: When modifying this class, take a look at {@link SimulationScheduler}.
  */
 @DriverSide
 @NotThreadSafe
@@ -69,20 +62,20 @@ public final class BatchScheduler implements Scheduler {
   /**
    * Components related to scheduling the given plan.
    */
-  private final TaskDispatcher taskDispatcher;
-  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
-  private final ExecutorRegistry executorRegistry;
-  private final PlanStateManager planStateManager;
+  private final TaskDispatcher taskDispatcher;  // Class for dispatching tasks.
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;  // A 'pointer' to the list of pending tasks.
+  private final ExecutorRegistry executorRegistry;  // A registry for executors available for the job.
+  private final PlanStateManager planStateManager;  // A component that manages the state of the plan.
 
   /**
    * Other necessary components of this {@link org.apache.nemo.runtime.master.RuntimeMaster}.
    */
-  private final BlockManagerMaster blockManagerMaster;
+  private final BlockManagerMaster blockManagerMaster;  // A component that manages data blocks.
 
   /**
    * The below variables depend on the submitted plan to execute.
    */
-  private List<List<Stage>> sortedScheduleGroups;
+  private List<List<Stage>> sortedScheduleGroups;  // Stages, sorted in the order to be scheduled.
 
   @Inject
   private BatchScheduler(final PlanRewriter planRewriter,
@@ -127,55 +120,13 @@ public final class BatchScheduler implements Scheduler {
   }
 
   /**
-   * @param taskId that generated the message.
-   * @param data   of the message.
-   */
-  public void onRunTimePassMessage(final String taskId, final Object data) {
-    final Set<StageEdge> targetEdges = getEdgesToOptimize(taskId);
-    planRewriter.accumulate(getMessageId(targetEdges), data);
-  }
-
-  /**
-   * Action for after task execution is put on hold.
+   * Process the RuntimePassMessage.
    *
-   * @param executorId the ID of the executor.
-   * @param taskId     the ID of the task.
+   * @param taskId           that generated the message.
+   * @param data             of the message.
    */
-  private void onTaskExecutionOnHold(final String executorId,
-                                     final String taskId) {
-    LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
-    executorRegistry.updateExecutor(executorId, (executor, state) -> {
-      executor.onTaskExecutionComplete(taskId);
-      return Pair.of(executor, state);
-    });
-    final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
-
-    final boolean stageComplete =
-      planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
-
-    final Set<StageEdge> targetEdges = getEdgesToOptimize(taskId);
-    if (targetEdges.isEmpty()) {
-      throw new RuntimeException("No edges specified for data skew optimization");
-    }
-
-    if (stageComplete) {
-      final PhysicalPlan updatedPlan = planRewriter.rewrite(
-        planStateManager.getPhysicalPlan(), getMessageId(targetEdges));
-      updatePlan(updatedPlan);
-    }
-  }
-
-  private int getMessageId(final Set<StageEdge> stageEdges) {
-    // Here we simply use findFirst() for now...
-    // TODO #345: Simplify insert
-    final Set<Integer> messageIds = stageEdges.stream()
-      .map(edge -> edge.getExecutionProperties()
-        .get(MessageIdEdgeProperty.class)
-        .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
-      .findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
-    // Type casting is needed. See: https://stackoverflow.com/a/40865318
-
-    return messageIds.iterator().next();
+  public void onRunTimePassMessage(final String taskId, final Object data) {
+    BatchSchedulerUtils.onRunTimePassMessage(planStateManager, planRewriter, taskId, data);
   }
 
   ////////////////////////////////////////////////////////////////////// Methods for scheduling.
@@ -231,14 +182,18 @@ public final class BatchScheduler implements Scheduler {
     planStateManager.onTaskStateChanged(taskId, newState);
     switch (newState) {
       case COMPLETE:
-        onTaskExecutionComplete(executorId, taskId);
+        BatchSchedulerUtils.onTaskExecutionComplete(executorRegistry, executorId, taskId);
         break;
       case SHOULD_RETRY:
         // SHOULD_RETRY from an executor means that the task ran into a recoverable failure
-        onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
+        BatchSchedulerUtils.onTaskExecutionFailedRecoverable(planStateManager, blockManagerMaster, executorRegistry,
+          executorId, taskId, failureCause);
         break;
       case ON_HOLD:
-        onTaskExecutionOnHold(executorId, taskId);
+        final Optional<PhysicalPlan> optionalPhysicalPlan =
+          BatchSchedulerUtils
+            .onTaskExecutionOnHold(planStateManager, executorRegistry, planRewriter, executorId, taskId);
+        optionalPhysicalPlan.ifPresent(this::updatePlan);
         break;
       case FAILED:
         throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
@@ -286,18 +241,18 @@ public final class BatchScheduler implements Scheduler {
   public void onSpeculativeExecutionCheck() {
     MutableBoolean isNewCloneCreated = new MutableBoolean(false);
 
-    selectEarliestSchedulableGroup().ifPresent(scheduleGroup ->
-      scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
-        final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
+    BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager)
+      .ifPresent(scheduleGroup ->
+        scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
+          final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
 
-        // Only if the ClonedSchedulingProperty is set...
-        stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
-          if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
-            isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
-          }
-        });
-      })
-    );
+          // Only if the ClonedSchedulingProperty is set...
+          stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
+            if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
+              isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
+            }
+          });
+        }));
 
     if (isNewCloneCreated.booleanValue()) {
       doSchedule(); // Do schedule the new clone.
@@ -327,7 +282,7 @@ public final class BatchScheduler implements Scheduler {
     interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
 
     // Retry the interrupted tasks (and required parents)
-    retryTasksAndRequiredParents(interruptedTasks);
+    BatchSchedulerUtils.retryTasksAndRequiredParents(planStateManager, blockManagerMaster, interruptedTasks);
 
     // Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
     doSchedule();
@@ -350,11 +305,13 @@ public final class BatchScheduler implements Scheduler {
    * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
    */
   private void doSchedule() {
-    final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
+    final Optional<List<Stage>> earliest =
+      BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager);
 
     if (earliest.isPresent()) {
       final List<Task> tasksToSchedule = earliest.get().stream()
-        .flatMap(stage -> selectSchedulableTasks(stage).stream())
+        .flatMap(stage ->
+          BatchSchedulerUtils.selectSchedulableTasks(planStateManager, blockManagerMaster, stage).stream())
         .collect(Collectors.toList());
       if (!tasksToSchedule.isEmpty()) {
         LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
@@ -373,55 +330,6 @@ public final class BatchScheduler implements Scheduler {
     }
   }
 
-  private Optional<List<Stage>> selectEarliestSchedulableGroup() {
-    if (sortedScheduleGroups == null) {
-      return Optional.empty();
-    }
-
-    return sortedScheduleGroups.stream()
-      .filter(scheduleGroup -> scheduleGroup.stream()
-        .map(Stage::getId)
-        .map(planStateManager::getStageState)
-        .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
-      .findFirst(); // selects the one with the smallest scheduling group index.
-  }
-
-  private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
-    if (stageToSchedule.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)) {
-      // Ignore ghost stage.
-      for (final String taskId : planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId())) {
-        planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
-        planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
-      }
-
-      return Collections.emptyList();
-    }
-
-    final List<StageEdge> stageIncomingEdges =
-      planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
-    final List<StageEdge> stageOutgoingEdges =
-      planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
-
-    // Create and return tasks.
-    final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
-
-    final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
-    final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
-    taskIdsToSchedule.forEach(taskId -> {
-      final Set<String> blockIds = getOutputBlockIds(taskId);
-      blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
-      final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
-      tasks.add(new Task(
-        planStateManager.getPhysicalPlan().getPlanId(),
-        taskId,
-        stageToSchedule.getExecutionProperties(),
-        stageToSchedule.getSerializedIRDAG(),
-        stageIncomingEdges,
-        stageOutgoingEdges,
-        vertexIdToReadables.get(taskIdx)));
-    });
-    return tasks;
-  }
 
   ////////////////////////////////////////////////////////////////////// Task cloning methods.
 
@@ -476,181 +384,4 @@ public final class BatchScheduler implements Scheduler {
 
     return false;
   }
-
-  ////////////////////////////////////////////////////////////////////// Task state change handlers
-
-  /**
-   * Action after task execution has been completed.
-   * Note this method should not be invoked when the previous state of the task is ON_HOLD.
-   *
-   * @param executorId id of the executor.
-   * @param taskId     the ID of the task completed.
-   */
-  private void onTaskExecutionComplete(final String executorId,
-                                       final String taskId) {
-    LOG.debug("{} completed in {}", taskId, executorId);
-    executorRegistry.updateExecutor(executorId, (executor, state) -> {
-      executor.onTaskExecutionComplete(taskId);
-      return Pair.of(executor, state);
-    });
-  }
-
-  /**
-   * Get the target edges of dynamic optimization.
-   * The edges are annotated with {@link MessageIdEdgeProperty}, which are outgoing edges of
-   * parents of the stage put on hold.
-   * <p>
-   * See {@link org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.SkewReshapingPass}
-   * for setting the target edges of dynamic optimization.
-   *
-   * @param taskId the task ID that sent stage-level aggregated message for dynamic optimization.
-   * @return the edges to optimize.
-   */
-  private Set<StageEdge> getEdgesToOptimize(final String taskId) {
-    final DAG<Stage, StageEdge> stageDag = planStateManager.getPhysicalPlan().getStageDAG();
-
-    // Get a stage including the given task
-    final Stage stagePutOnHold = stageDag.getVertices().stream()
-      .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
-      .findFirst()
-      .orElseThrow(RuntimeException::new);
-
-    // Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
-    // should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
-    final List<Integer> messageIds = stagePutOnHold.getIRDAG()
-      .getVertices()
-      .stream()
-      .filter(v -> v.getPropertyValue(MessageIdVertexProperty.class).isPresent())
-      .map(v -> v.getPropertyValue(MessageIdVertexProperty.class).get())
-      .collect(Collectors.toList());
-    if (messageIds.size() != 1) {
-      throw new IllegalStateException("Must be exactly one vertex with the message id: " + messageIds.toString());
-    }
-    final int messageId = messageIds.get(0);
-    final Set<StageEdge> targetEdges = new HashSet<>();
-
-    // Get edges with identical MessageIdEdgeProperty (except the put on hold stage)
-    for (final Stage stage : stageDag.getVertices()) {
-      final Set<StageEdge> targetEdgesFound = stageDag.getOutgoingEdgesOf(stage).stream()
-        .filter(candidateEdge -> {
-          final Optional<HashSet<Integer>> candidateMCId =
-            candidateEdge.getPropertyValue(MessageIdEdgeProperty.class);
-          return candidateMCId.isPresent() && candidateMCId.get().contains(messageId);
-        })
-        .collect(Collectors.toSet());
-      targetEdges.addAll(targetEdgesFound);
-    }
-
-    return targetEdges;
-  }
-
-  /**
-   * Action for after task execution has failed but it's recoverable.
-   *
-   * @param executorId   the ID of the executor
-   * @param taskId       the ID of the task
-   * @param failureCause the cause of failure
-   */
-  private void onTaskExecutionFailedRecoverable(final String executorId,
-                                                final String taskId,
-                                                final TaskState.RecoverableTaskFailureCause failureCause) {
-    LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
-    executorRegistry.updateExecutor(executorId, (executor, state) -> {
-      executor.onTaskExecutionFailed(taskId);
-      return Pair.of(executor, state);
-    });
-
-    switch (failureCause) {
-      // Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
-      case INPUT_READ_FAILURE:
-        // TODO #54: Handle remote data fetch failures
-      case OUTPUT_WRITE_FAILURE:
-        blockManagerMaster.onProducerTaskFailed(taskId);
-        break;
-      default:
-        throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
-    }
-
-    retryTasksAndRequiredParents(Collections.singleton(taskId));
-  }
-
-  ////////////////////////////////////////////////////////////////////// Helper methods
-
-  private void retryTasksAndRequiredParents(final Set<String> tasks) {
-    final Set<String> requiredParents = recursivelyGetParentTasksForLostBlocks(tasks);
-    final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
-    LOG.info("Will be retried: {}", tasksToRetry);
-    tasksToRetry.forEach(
-      taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
-  }
-
-  private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
-    if (children.isEmpty()) {
-      return Collections.emptySet();
-    }
-    final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
-
-    final Map<String, StageEdge> idToIncomingEdges = children.stream()
-      .map(RuntimeIdManager::getStageIdFromTaskId)
-      .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
-      // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
-      .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
-
-    final Set<String> parentsWithLostBlocks = children.stream()
-      .flatMap(child -> getInputBlockIds(child).stream()) // child task id -> parent block ids
-      .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
-      .collect(Collectors.toSet()).stream() // remove duplicate wildcards
-      .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
-        blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
-      .flatMap(lostParentBlockWildcard -> {
-        // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
-        final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
-        final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
-        final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
-        return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
-          .stream()
-          .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
-            && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
-          // COMPLETE -> SHOULD_RETRY
-          .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
-      })
-      .collect(Collectors.toSet());
-
-
-    // Recursive call
-    return Sets.union(parentsWithLostBlocks, recursivelyGetParentTasksForLostBlocks(parentsWithLostBlocks));
-  }
-
-  private Set<String> getOutputBlockIds(final String taskId) {
-    return planStateManager.getPhysicalPlan().getStageDAG()
-      .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
-      .stream()
-      .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
-      .collect(Collectors.toSet()); // ids of blocks this task will produce
-  }
-
-  private Set<String> getInputBlockIds(final String childTaskId) {
-    final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
-    return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
-      .stream()
-      .flatMap(inStageEdge -> {
-        final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
-        switch (inStageEdge.getDataCommunicationPattern()) {
-          case SHUFFLE:
-          case BROADCAST:
-            // All of the parent stage's tasks
-            return parentTaskIds.stream()
-              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
-          case ONE_TO_ONE:
-            // Same-index tasks of the parent stage
-            return parentTaskIds.stream()
-              .filter(parentTaskId ->
-                RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
-              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
-          default:
-            throw new IllegalStateException(inStageEdge.toString());
-        }
-      })
-      .collect(Collectors.toSet());
-  }
 }
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java
new file mode 100644
index 0000000..f7249cb
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.google.common.collect.Sets;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.exception.UnknownFailureCauseException;
+import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.*;
+import org.apache.nemo.runtime.common.state.BlockState;
+import org.apache.nemo.runtime.common.state.StageState;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PlanStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Utlity methods regarding schedulers.
+ */
+public final class BatchSchedulerUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(BatchSchedulerUtils.class.getName());
+
+  /**
+   * Private constructor for utility class.
+   */
+  private BatchSchedulerUtils() {
+  }
+
+
+  static Optional<List<Stage>> selectEarliestSchedulableGroup(final List<List<Stage>> sortedScheduleGroups,
+                                                              final PlanStateManager planStateManager) {
+    if (sortedScheduleGroups == null) {
+      return Optional.empty();
+    }
+
+    return sortedScheduleGroups.stream()
+      .filter(scheduleGroup -> scheduleGroup.stream()
+        .map(Stage::getId)
+        .map(planStateManager::getStageState)
+        .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
+      .findFirst(); // selects the one with the smallest scheduling group index.
+  }
+
+  static List<Task> selectSchedulableTasks(final PlanStateManager planStateManager,
+                                           final BlockManagerMaster blockManagerMaster,
+                                           final Stage stageToSchedule) {
+    if (stageToSchedule.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)) {
+      // Ignore ghost stage.
+      for (final String taskId : planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId())) {
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+      }
+
+      return Collections.emptyList();
+    }
+
+    final List<StageEdge> stageIncomingEdges =
+      planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+    final List<StageEdge> stageOutgoingEdges =
+      planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+
+    // Create and return tasks.
+    final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
+
+    final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
+    final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
+    taskIdsToSchedule.forEach(taskId -> {
+      final Set<String> blockIds = BatchSchedulerUtils.getOutputBlockIds(planStateManager, taskId);
+      blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
+      final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
+      tasks.add(new Task(
+        planStateManager.getPhysicalPlan().getPlanId(),
+        taskId,
+        stageToSchedule.getExecutionProperties(),
+        stageToSchedule.getSerializedIRDAG(),
+        stageIncomingEdges,
+        stageOutgoingEdges,
+        vertexIdToReadables.get(taskIdx)));
+    });
+    return tasks;
+  }
+
+  ////////////////////////////////////////////////////////////////////// Task state change handlers
+
+  /**
+   * Action after task execution has been completed.
+   * Note this method should not be invoked when the previous state of the task is ON_HOLD.
+   *
+   * @param executorRegistry the registry for available executors.
+   * @param executorId       id of the executor.
+   * @param taskId           the ID of the task completed.
+   */
+  static void onTaskExecutionComplete(final ExecutorRegistry executorRegistry,
+                                      final String executorId,
+                                      final String taskId) {
+    LOG.debug("{} completed in {}", taskId, executorId);
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionComplete(taskId);
+      return Pair.of(executor, state);
+    });
+  }
+
+  /**
+   * Get the target edges of dynamic optimization.
+   * The edges are annotated with {@link MessageIdEdgeProperty}, which are outgoing edges of
+   * parents of the stage put on hold.
+   * <p>
+   * See {@link org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.SkewReshapingPass}
+   * for setting the target edges of dynamic optimization.
+   *
+   * @param taskId the task ID that sent stage-level aggregated message for dynamic optimization.
+   * @return the edges to optimize.
+   */
+  static Set<StageEdge> getEdgesToOptimize(final PlanStateManager planStateManager,
+                                                   final String taskId) {
+    final DAG<Stage, StageEdge> stageDag = planStateManager.getPhysicalPlan().getStageDAG();
+
+    // Get a stage including the given task
+    final Stage stagePutOnHold = stageDag.getVertices().stream()
+      .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
+      .findFirst()
+      .orElseThrow(RuntimeException::new);
+
+    // Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
+    // should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
+    final List<Integer> messageIds = stagePutOnHold.getIRDAG()
+      .getVertices()
+      .stream()
+      .filter(v -> v.getPropertyValue(MessageIdVertexProperty.class).isPresent())
+      .map(v -> v.getPropertyValue(MessageIdVertexProperty.class).get())
+      .collect(Collectors.toList());
+    if (messageIds.size() != 1) {
+      throw new IllegalStateException("Must be exactly one vertex with the message id: " + messageIds.toString());
+    }
+    final int messageId = messageIds.get(0);
+    final Set<StageEdge> targetEdges = new HashSet<>();
+
+    // Get edges with identical MessageIdEdgeProperty (except the put on hold stage)
+    for (final Stage stage : stageDag.getVertices()) {
+      final Set<StageEdge> targetEdgesFound = stageDag.getOutgoingEdgesOf(stage).stream()
+        .filter(candidateEdge -> {
+          final Optional<HashSet<Integer>> candidateMCId =
+            candidateEdge.getPropertyValue(MessageIdEdgeProperty.class);
+          return candidateMCId.isPresent() && candidateMCId.get().contains(messageId);
+        })
+        .collect(Collectors.toSet());
+      targetEdges.addAll(targetEdgesFound);
+    }
+
+    return targetEdges;
+  }
+
+  /**
+   * Action for after task execution has failed but it's recoverable.
+   *
+   * @param executorId   the ID of the executor
+   * @param taskId       the ID of the task
+   * @param failureCause the cause of failure
+   */
+  static void onTaskExecutionFailedRecoverable(final PlanStateManager planStateManager,
+                                               final BlockManagerMaster blockManagerMaster,
+                                               final ExecutorRegistry executorRegistry,
+                                               final String executorId,
+                                               final String taskId,
+                                               final TaskState.RecoverableTaskFailureCause failureCause) {
+    LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionFailed(taskId);
+      return Pair.of(executor, state);
+    });
+
+    switch (failureCause) {
+      // Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
+      case INPUT_READ_FAILURE:
+        // TODO #54: Handle remote data fetch failures
+      case OUTPUT_WRITE_FAILURE:
+        blockManagerMaster.onProducerTaskFailed(taskId);
+        break;
+      default:
+        throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
+    }
+
+    retryTasksAndRequiredParents(planStateManager, blockManagerMaster, Collections.singleton(taskId));
+  }
+
+  /**
+   * Action for after task execution is put on hold.
+   *
+   * @param executorId the ID of the executor.
+   * @param taskId     the ID of the task.
+   */
+  static Optional<PhysicalPlan> onTaskExecutionOnHold(final PlanStateManager planStateManager,
+                                                      final ExecutorRegistry executorRegistry,
+                                                      final PlanRewriter planRewriter,
+                                                      final String executorId,
+                                                      final String taskId) {
+    LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionComplete(taskId);
+      return Pair.of(executor, state);
+    });
+    final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
+
+    final boolean stageComplete =
+      planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+
+    final Set<StageEdge> targetEdges = getEdgesToOptimize(planStateManager, taskId);
+    if (targetEdges.isEmpty()) {
+      throw new RuntimeException("No edges specified for data skew optimization");
+    }
+
+    if (stageComplete) {
+      return Optional.of(planRewriter.rewrite(
+        planStateManager.getPhysicalPlan(), getMessageId(targetEdges)));
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Process the RuntimePassMessage.
+   *
+   * @param planStateManager to get the edges for the optimization.
+   * @param planRewriter     for rewriting the plan later on.
+   * @param taskId           that generated the message.
+   * @param data             of the message.
+   */
+  public static void onRunTimePassMessage(final PlanStateManager planStateManager, final PlanRewriter planRewriter,
+                                          final String taskId, final Object data) {
+    final Set<StageEdge> targetEdges = BatchSchedulerUtils.getEdgesToOptimize(planStateManager, taskId);
+    planRewriter.accumulate(BatchSchedulerUtils.getMessageId(targetEdges), data);
+  }
+
+  static int getMessageId(final Set<StageEdge> stageEdges) {
+    // Here we simply use findFirst() for now...
+    // TODO #345: Simplify insert
+    final Set<Integer> messageIds = stageEdges.stream()
+      .map(edge -> edge.getExecutionProperties()
+        .get(MessageIdEdgeProperty.class)
+        .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
+      .findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
+    // Type casting is needed. See: https://stackoverflow.com/a/40865318
+
+    return messageIds.iterator().next();
+  }
+
+  ////////////////////////////////////////////////////////////////////// Helper methods
+
+  static void retryTasksAndRequiredParents(final PlanStateManager planStateManager,
+                                           final BlockManagerMaster blockManagerMaster,
+                                           final Set<String> tasks) {
+    final Set<String> requiredParents =
+      recursivelyGetParentTasksForLostBlocks(planStateManager, blockManagerMaster, tasks);
+    final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
+    LOG.info("Will be retried: {}", tasksToRetry);
+    tasksToRetry.forEach(
+      taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+  }
+
+  static Set<String> recursivelyGetParentTasksForLostBlocks(final PlanStateManager planStateManager,
+                                                            final BlockManagerMaster blockManagerMaster,
+                                                            final Set<String> children) {
+    if (children.isEmpty()) {
+      return Collections.emptySet();
+    }
+    final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
+
+    final Map<String, StageEdge> idToIncomingEdges = children.stream()
+      .map(RuntimeIdManager::getStageIdFromTaskId)
+      .flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
+      // Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
+      .collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
+
+    final Set<String> parentsWithLostBlocks = children.stream()
+      .flatMap(child -> getInputBlockIds(planStateManager, child).stream()) // child task id -> parent block ids
+      .map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
+      .collect(Collectors.toSet()).stream() // remove duplicate wildcards
+      .filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
+        blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
+      .flatMap(lostParentBlockWildcard -> {
+        // COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
+        final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
+        final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
+        final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
+        return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
+          .stream()
+          .filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
+            && RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
+          // COMPLETE -> SHOULD_RETRY
+          .filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
+      })
+      .collect(Collectors.toSet());
+
+
+    // Recursive call
+    return Sets.union(parentsWithLostBlocks,
+      recursivelyGetParentTasksForLostBlocks(planStateManager, blockManagerMaster, parentsWithLostBlocks));
+  }
+
+  static Set<String> getOutputBlockIds(final PlanStateManager planStateManager,
+                                       final String taskId) {
+    return planStateManager.getPhysicalPlan().getStageDAG()
+      .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
+      .stream()
+      .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
+      .collect(Collectors.toSet()); // ids of blocks this task will produce
+  }
+
+  static Set<String> getInputBlockIds(final PlanStateManager planStateManager,
+                                      final String childTaskId) {
+    final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
+    return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
+      .stream()
+      .flatMap(inStageEdge -> {
+        final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
+        switch (inStageEdge.getDataCommunicationPattern()) {
+          case SHUFFLE:
+          case BROADCAST:
+            // All of the parent stage's tasks
+            return parentTaskIds.stream()
+              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+          case ONE_TO_ONE:
+            // Same-index tasks of the parent stage
+            return parentTaskIds.stream()
+              .filter(parentTaskId ->
+                RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
+              .map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
+          default:
+            throw new IllegalStateException(inStageEdge.toString());
+        }
+      })
+      .collect(Collectors.toSet());
+  }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
index 8395f01..edf15ab 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -57,6 +57,14 @@ public final class ExecutorRegistry {
     this.executors = new HashMap<>();
   }
 
+  /**
+   * Static constructor for manual usage.
+   * @return a new instance of ExecutorRegistry.
+   */
+  public static ExecutorRegistry newInstance() {
+    return new ExecutorRegistry();
+  }
+
   synchronized void registerExecutor(final ExecutorRepresenter executor) {
     final String executorId = executor.getExecutorId();
     if (executors.containsKey(executorId)) {
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
index 575e9a6..d3397c3 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
@@ -39,6 +39,14 @@ public final class PendingTaskCollectionPointer {
   }
 
   /**
+   * Static constructor for manual usage.
+   * @return a new instance of PendingTaskCollectionPointer.
+   */
+  public static PendingTaskCollectionPointer newInstance() {
+    return new PendingTaskCollectionPointer();
+  }
+
+  /**
    * This collection of tasks should take precedence over any previous collection of tasks.
    *
    * @param newCollection to schedule.
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java
new file mode 100644
index 0000000..005a7ff
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Streams;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.MessageUtils;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.metric.StateTransitionEvent;
+import org.apache.nemo.runtime.common.metric.TaskMetric;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.Task;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Class for simulated task execution.
+ */
+public final class SimulatedTaskExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(SimulatedTaskExecutor.class.getName());
+  private static final String TASK_METRIC_ID = "TaskMetric";
+
+  /**
+   * the simulation scheduler that the executor is associated with.
+   */
+  private final SimulationScheduler scheduler;
+  private final ExecutorRepresenter executorRepresenter;
+  private Long executorInitializationTime;
+  private final AtomicLong currentTime;
+  private Long timeCheckpoint;
+  private final MetricStore actualMetricStore;
+  private final ConcurrentMap<String, DAG<IRVertex, RuntimeEdge<IRVertex>>> stageIDToStageIRDAG;
+
+  /**
+   * Constructor.
+   * @param scheduler the simulation scheduler that the executor is associated with.
+   */
+  SimulatedTaskExecutor(final SimulationScheduler scheduler,
+                        final ExecutorRepresenter executorRepresenter,
+                        final MetricStore actualMetricStore) {
+    this.scheduler = scheduler;
+    this.executorRepresenter = executorRepresenter;
+    this.actualMetricStore = actualMetricStore;
+    this.stageIDToStageIRDAG = new ConcurrentHashMap<>();
+    this.currentTime = new AtomicLong(-1L);
+    this.executorInitializationTime = -1L;
+  }
+
+  /**
+   * Calculate the expected task duration.
+   * This only works if there exists metrics in the actual MetricStore, that contains information about the stage,
+   * as well as with the parallelism the task is associated with.
+   *
+   * @param task the task to calculate the task duration for.
+   * @return the expected task duration.
+   */
+  private long calculateExpectedTaskDuration(final Task task) {
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> stageIRDAG = stageIDToStageIRDAG.computeIfAbsent(task.getStageId(),
+      i -> SerializationUtils.deserialize(task.getSerializedIRDag()));
+
+    final Map<String, Object> jobMetricMap = this.actualMetricStore.getMetricMap(JobMetric.class);
+    if (jobMetricMap.size() > 1) {
+      LOG.warn("MetricStore has more than one JobMetric. The results could be misleading.");
+    }
+    // Fetch first element.
+    final JobMetric jobMetric = (JobMetric) jobMetricMap.entrySet().iterator().next().getValue();
+    final JsonNode stageDAG = jobMetric.getStageDAG();
+
+    // Gather ID of stages that have the characteristics that the task possesses.
+    final Set<String> stageIdsToGatherMetricsFrom = Streams.stream(() -> stageDAG.get("vertices").iterator())
+      .filter(s -> s.get("properties").get("irDag").get("vertices").size()
+        == stageIRDAG.getVertices().size())  // same # of vertices.
+      .filter(s -> s.get("properties").get("irDag").get("edges").size()
+        == stageIRDAG.getEdges().size())  // same # of edges.
+      .filter(s -> s.get("properties").get("executionProperties")
+        .get("org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty").asInt(0)
+        == task.getPropertyValue(ParallelismProperty.class).orElse(0))  // same parallelism.
+      .map(s -> s.get("id").asText())
+      .collect(Collectors.toSet());
+
+    // Derive the average task duration from the stages.
+    final OptionalDouble average = this.actualMetricStore.getMetricMap(TaskMetric.class).entrySet().stream()
+      .filter(e -> stageIdsToGatherMetricsFrom.contains(RuntimeIdManager.getStageIdFromTaskId(e.getKey())))
+      .map(Map.Entry::getValue)  // stream of TaskMetric.
+      .mapToLong(tm -> ((TaskMetric) tm).getTaskDuration())
+      .filter(l -> l > 0)
+      .average();
+
+    // convert to long and save.
+    return (long) (average.orElse(0) + 0.5);  // 0 to indicate something went wrong
+  }
+
+  /**
+   * Handle the task and record metrics, as a real Executor#onTaskReceived would.
+   *
+   * @param task the task to execute.
+   */
+  public void onTaskReceived(final Task task) {
+    if (executorInitializationTime < 0) {
+      executorInitializationTime = System.currentTimeMillis();
+      currentTime.set(executorInitializationTime);
+      timeCheckpoint = executorInitializationTime;
+    }
+    final String taskId = task.getTaskId();
+    final int attemptIdx = task.getAttemptIdx();
+    String idOfVertexPutOnHold = null;
+
+    final long schedulingOverhead = System.currentTimeMillis() - this.timeCheckpoint;
+    this.timeCheckpoint = System.currentTimeMillis();
+    this.sendMetric(TASK_METRIC_ID, taskId, "schedulingOverhead",
+      SerializationUtils.serialize(schedulingOverhead));
+    final long executionStartTime = this.currentTime.getAndAdd(schedulingOverhead);
+
+    // Prepare (constructor of TaskExecutor)
+    // Deserialize task
+    // Connect incoming / outgoing edges.
+
+    // Execute
+    LOG.debug("{} started", taskId);
+
+    // Fetch external data (Read) and process them
+    // Finalize vertex and write
+
+    final long expectedTaskDuration = this.calculateExpectedTaskDuration(task);
+    this.currentTime.getAndAdd(expectedTaskDuration);
+
+    this.sendMetric(TASK_METRIC_ID, taskId, "taskDuration",
+      SerializationUtils.serialize(this.currentTime.get() - executionStartTime));
+    this.timeCheckpoint = System.currentTimeMillis();
+    if (idOfVertexPutOnHold == null) {
+      this.onTaskStateChanged(taskId, attemptIdx, TaskState.State.COMPLETE,
+        Optional.empty(), Optional.empty());
+      LOG.debug("{} completed", taskId);
+    } else {
+      this.onTaskStateChanged(taskId, attemptIdx, TaskState.State.ON_HOLD,
+        Optional.of(idOfVertexPutOnHold), Optional.empty());
+      LOG.debug("{} on hold", taskId);
+    }
+  }
+
+  /**
+   *
+   * @return the elapsed time for the executor.
+   */
+  public Long getElapsedTime() {
+    return currentTime.get() - executorInitializationTime;
+  }
+
+  /**
+   * Updates the state of the task.
+   *
+   * @param taskId          of the task.
+   * @param attemptIdx      of the task.
+   * @param newState        of the task.
+   * @param vertexPutOnHold the vertex put on hold.
+   * @param cause           only provided as non-empty upon recoverable failures.
+   */
+  private void onTaskStateChanged(final String taskId,
+                                  final int attemptIdx,
+                                  final TaskState.State newState,
+                                  final Optional<String> vertexPutOnHold,
+                                  final Optional<TaskState.RecoverableTaskFailureCause> cause) {
+    this.sendMetric("TaskMetric", taskId,
+      "stateTransitionEvent", SerializationUtils.serialize(new StateTransitionEvent<>(
+        this.currentTime.get(), null, newState
+      )));
+
+    final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
+      ControlMessage.TaskStateChangedMsg.newBuilder()
+        .setExecutorId(executorRepresenter.getExecutorId())
+        .setTaskId(taskId)
+        .setAttemptIdx(attemptIdx)
+        .setState(MessageUtils.convertState(newState));
+    if (newState == TaskState.State.ON_HOLD && vertexPutOnHold.isPresent()) {
+      msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
+    }
+    cause.ifPresent(c -> msgBuilder.setFailureCause(MessageUtils.convertFailureCause(c)));
+
+    // Send taskStateChangedMsg to master!
+    this.sendControlMessage(
+      ControlMessage.Message.newBuilder()
+        .setId(RuntimeIdManager.generateMessageId())
+        .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+        .setType(ControlMessage.MessageType.TaskStateChanged)
+        .setTaskStateChangedMsg(msgBuilder.build())
+        .build());
+  }
+
+  /**
+   * Send the control message to the scheduler, as an executor would. Main handles TaskStateChanged messages.
+   *
+   * @param message control message to send.
+   */
+  private void sendControlMessage(final ControlMessage.Message message) {
+    this.executorRepresenter.sendControlMessage(message);
+  }
+
+  /**
+   * Send the metric to the scheduler, as an executor would. See where it is used in MetricMessageSender#send.
+   *
+   * @param metricType  type of metric.
+   * @param metricId    id of metric.
+   * @param metricField field of metric.
+   * @param metricValue value of metric.
+   */
+  public void sendMetric(final String metricType, final String metricId,
+                         final String metricField, final byte[] metricValue) {
+    this.scheduler.handleMetricMessage(metricType, metricId, metricField, metricValue);
+  }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java
new file mode 100644
index 0000000..42870f6
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.exception.IllegalMessageException;
+import org.apache.nemo.common.exception.SimulationException;
+import org.apache.nemo.common.exception.UnknownExecutionStateException;
+import org.apache.nemo.common.exception.UnrecoverableFailureException;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageSender;
+import org.apache.nemo.runtime.common.message.MessageUtils;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.metric.Metric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PlanRewriter;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.Task;
+import org.apache.nemo.runtime.common.state.StageState;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PlanAppender;
+import org.apache.nemo.runtime.master.PlanStateManager;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
+import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Scheduler for simulating an execution not controlled by the runtime master. This class follows the structure of
+ * {@link BatchScheduler}, so when a change has to be made on BatchScheduler, it also means that it should be
+ * reflected in this class as well.
+ */
+@DriverSide
+@NotThreadSafe
+public final class SimulationScheduler implements Scheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(SimulationScheduler.class.getName());
+
+  /**
+   * Run-time optimizations.
+   */
+  private final PlanRewriter planRewriter;
+
+  /**
+   * Components related to scheduling the given plan. The role of each class can be found in {@link BatchScheduler}.
+   */
+  private TaskDispatcher taskDispatcher;
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
+  private ExecutorRegistry executorRegistry;
+  private PlanStateManager planStateManager;
+
+  /**
+   * Executor service for scheduling message serialization.
+   */
+  private final ExecutorService serializationExecutorService;
+
+  /**
+   *  A component that manages data blocks. We actually don't call for the real data in simulations,
+   *  so it's just here to prevent errors.
+   */
+  private final BlockManagerMaster blockManagerMaster;
+
+  /**
+   * The actual metric store, from the actual job that's running, which calls for the simulation to occur.
+   */
+  private final MetricStore actualMetricStore;
+  /**
+   * The metric store for the simulation. This adds up the metrics for the simulated task executions.
+   */
+  private MetricStore metricStore;
+  /**
+   * A latch to confirm that the job has finished and all metrics has been flushed.
+   */
+  private CountDownLatch metricCountDownLatch;
+
+  /**
+   * Components that tell how to schedule the given tasks.
+   */
+  private final SchedulingConstraintRegistry schedulingConstraintRegistry;
+  private final SchedulingPolicy schedulingPolicy;
+  /**
+   * String to generate simulated executors from.
+   */
+  private final String resourceSpecificationString;
+  private final String dagDirectory;
+
+  /**
+   * A map from executor ID to the simulated task executor object.
+   */
+  private final Map<String, SimulatedTaskExecutor> simulatedTaskExecutorMap;
+
+  /**
+   * The below variables depend on the submitted plan to execute.
+   */
+  private List<List<Stage>> sortedScheduleGroups;
+
+  @Inject
+  private SimulationScheduler(final PlanRewriter planRewriter,
+                              final SchedulingConstraintRegistry schedulingConstraintRegistry,
+                              final BlockManagerMaster blockManagerMaster,
+                              @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
+                              @Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread,
+                              @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
+    this.planRewriter = planRewriter;
+    this.blockManagerMaster = blockManagerMaster;
+    this.pendingTaskCollectionPointer = PendingTaskCollectionPointer.newInstance();
+    this.executorRegistry = ExecutorRegistry.newInstance();
+    this.schedulingConstraintRegistry = schedulingConstraintRegistry;
+    this.schedulingPolicy = new SimulationSchedulingPolicy();
+    this.resourceSpecificationString = resourceSpecificationString;
+    this.dagDirectory = dagDirectory;
+    this.planStateManager = PlanStateManager.newInstance(dagDirectory);
+    this.taskDispatcher = TaskDispatcher.newInstance(schedulingConstraintRegistry, schedulingPolicy,
+      pendingTaskCollectionPointer, executorRegistry, planStateManager);
+    this.serializationExecutorService = Executors.newFixedThreadPool(scheduleSerThread);
+    this.actualMetricStore = MetricStore.getStore();
+    this.metricStore = MetricStore.newInstance();
+    this.planStateManager.setMetricStore(this.metricStore);
+    this.simulatedTaskExecutorMap = new HashMap<>();
+    setUpExecutors();
+  }
+
+  /**
+   * Simulate the launch of executors.
+   */
+  private void setUpExecutors() {
+    final List<Pair<Integer, ResourceSpecification>> resourceSpecs =
+      Util.parseResourceSpecificationString(resourceSpecificationString);
+    // Role of ActiveContextHandler + RuntimeMaster.onExecuterLaunched.
+    final AtomicInteger executorIdGenerator = new AtomicInteger(0);
+    final AtomicInteger resourceRequestCount = new AtomicInteger(0);
+    resourceSpecs.forEach(p -> {
+      for (int i = 0; i < p.left(); i++) {
+        resourceRequestCount.getAndIncrement();
+        final ActiveContext ac = new SimulationEvaluatorActiveContext(executorIdGenerator.getAndIncrement());
+        this.onExecutorAdded(new DefaultExecutorRepresenter(ac.getId(), p.right(),
+          new SimulationMessageSender(ac.getId(), this), ac, serializationExecutorService, ac.getId()));
+      }
+    });
+    this.metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
+  }
+
+  /**
+   * Reset the instance to its initial state.
+   */
+  public void reset() {
+    this.terminate();
+    this.executorRegistry = ExecutorRegistry.newInstance();
+    this.planStateManager = PlanStateManager.newInstance(dagDirectory);
+    this.pendingTaskCollectionPointer.getAndSetNull();
+    this.taskDispatcher = TaskDispatcher.newInstance(schedulingConstraintRegistry, schedulingPolicy,
+      pendingTaskCollectionPointer, executorRegistry, planStateManager);
+    this.metricStore = MetricStore.newInstance();
+    this.planStateManager.setMetricStore(metricStore);
+    this.simulatedTaskExecutorMap.clear();
+    setUpExecutors();
+  }
+
+  @VisibleForTesting
+  public PlanStateManager getPlanStateManager() {
+    return planStateManager;
+  }
+
+  /**
+   * The entrance point of the simulator. Simulate a plan by submitting a plan through this method.
+   * @param submittedPhysicalPlan the plan to simulate.
+   * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
+   */
+  @Override
+  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final int maxScheduleAttempt) {
+    // Execute the given plan.
+    LOG.info("Plan to schedule: {}", submittedPhysicalPlan.getPlanId());
+
+    if (!planStateManager.isInitialized()) {
+      // First scheduling.
+      taskDispatcher.run();
+      updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
+      planStateManager.storeJSON("submitted");
+    } else {
+      // Append the submitted plan to the original plan.
+      final PhysicalPlan appendedPlan =
+        PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
+      updatePlan(appendedPlan, maxScheduleAttempt);
+      planStateManager.storeJSON("appended");
+    }
+
+    doSchedule();
+
+    try {
+      planStateManager.waitUntilFinish();
+    } finally {
+      planStateManager.storeJSON("final");
+    }
+
+    final Long jobDuration = this.simulatedTaskExecutorMap.values().stream()
+      .mapToLong(SimulatedTaskExecutor::getElapsedTime)
+      .max().orElse(0);
+    LOG.info("Simulation of {} is complete with job duration of {}!", submittedPhysicalPlan.getPlanId(), jobDuration);
+    this.metricStore.getOrCreateMetric(JobMetric.class, submittedPhysicalPlan.getPlanId()).setJobDuration(jobDuration);
+    executorRegistry.viewExecutors(executors -> executors.forEach(executor -> metricCountDownLatch.countDown()));
+  }
+
+  /**
+   * The main entry point for task scheduling.
+   * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
+   * <p>
+   * These are the reasons why.
+   * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
+   * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
+   */
+  private void doSchedule() {
+    final java.util.Optional<List<Stage>> earliest =
+      BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager);
+
+    if (earliest.isPresent()) {
+      final List<Task> tasksToSchedule = earliest.get().stream()
+        .flatMap(stage ->
+          BatchSchedulerUtils.selectSchedulableTasks(planStateManager, blockManagerMaster, stage).stream())
+        .collect(Collectors.toList());
+      if (!tasksToSchedule.isEmpty()) {
+        LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+          .map(Task::getTaskId)
+          .map(RuntimeIdManager::getStageIdFromTaskId)
+          .collect(Collectors.toSet()));
+
+        // Set the pointer to the schedulable tasks.
+        pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+
+        // Notify the dispatcher that a new collection is available.
+        taskDispatcher.onNewPendingTaskCollectionAvailable();
+      }
+    } else {
+      LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
+    }
+  }
+
+  @Override
+  public void updatePlan(final PhysicalPlan newPhysicalPlan) {
+    // update the physical plan in the scheduler.
+    // NOTE: what's already been executed is not modified in the new physical plan.
+    // TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
+    updatePlan(newPhysicalPlan, planStateManager.getMaxScheduleAttempt());
+  }
+
+  /**
+   * Update the physical plan in the scheduler.
+   *
+   * @param newPhysicalPlan    the new physical plan to update.
+   * @param maxScheduleAttempt the maximum number of task scheduling attempt.
+   */
+  private void updatePlan(final PhysicalPlan newPhysicalPlan,
+                          final int maxScheduleAttempt) {
+    planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
+    this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
+      .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+      .entrySet().stream()
+      .sorted(Map.Entry.comparingByKey())
+      .map(Map.Entry::getValue)
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+    LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
+    executorRegistry.registerExecutor(executorRepresenter);
+    this.simulatedTaskExecutorMap.put(executorRepresenter.getExecutorId(),
+      new SimulatedTaskExecutor(this, executorRepresenter, actualMetricStore));
+    taskDispatcher.onExecutorSlotAvailable();
+  }
+
+  @Override
+  public void onExecutorRemoved(final String executorId) {
+    // Role of FailedEvaluatorHandler + onExecutorFailed.
+    metricCountDownLatch.countDown();
+    LOG.info("{} removed", executorId);
+
+    // These are tasks that were running at the time of executor removal.
+    final Set<String> interruptedTasks = new HashSet<>();
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      interruptedTasks.addAll(executor.onExecutorFailed());
+      return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
+    });
+
+    // Retry the interrupted tasks (and required parents)
+    BatchSchedulerUtils.retryTasksAndRequiredParents(planStateManager, blockManagerMaster, interruptedTasks);
+
+    // Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
+    doSchedule();
+  }
+
+  /**
+   * Process the RuntimePassMessage.
+   *
+   * @param taskId that generated the message.
+   * @param data   of the message.
+   */
+  public void onRunTimePassMessage(final String taskId, final Object data) {
+    // TODO #436: Dynamic task resizing.
+    BatchSchedulerUtils.onRunTimePassMessage(planStateManager, planRewriter, taskId, data);
+  }
+
+  @Override
+  public synchronized void onTaskStateReportFromExecutor(final String executorId,
+                                                         final String taskId,
+                                                         final int attemptIdx,
+                                                         final TaskState.State newState,
+                                                         @Nullable final String taskPutOnHold,
+                                                         final TaskState.RecoverableTaskFailureCause failureCause) {
+    // Role of MasterControlMessageReceiver + handleControlMessage --> onTaskStateChanged.
+    // Do change state, as this notification is for the current task attempt.
+    planStateManager.onTaskStateChanged(taskId, newState);
+    switch (newState) {
+      case COMPLETE:
+        BatchSchedulerUtils.onTaskExecutionComplete(executorRegistry, executorId, taskId);
+        break;
+      case SHOULD_RETRY:
+        // SHOULD_RETRY from an executor means that the task ran into a recoverable failure
+        BatchSchedulerUtils.onTaskExecutionFailedRecoverable(planStateManager, blockManagerMaster, executorRegistry,
+          executorId, taskId, failureCause);
+        break;
+      case ON_HOLD:
+        final java.util.Optional<PhysicalPlan> optionalPhysicalPlan =
+          BatchSchedulerUtils
+            .onTaskExecutionOnHold(planStateManager, executorRegistry, planRewriter, executorId, taskId);
+        optionalPhysicalPlan.ifPresent(this::updatePlan);
+        break;
+      case FAILED:
+        throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
+          taskId, executorId)));
+      case READY:
+      case EXECUTING:
+        throw new SimulationException("The states READY/EXECUTING cannot occur at this point");
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
+    }
+
+    // Invoke doSchedule()
+    switch (newState) {
+      case COMPLETE:
+      case ON_HOLD:
+        // If the stage has completed
+        final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
+        if (planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)
+          && !planStateManager.isPlanDone()) {
+          doSchedule();
+        }
+        break;
+      case SHOULD_RETRY:
+        // Do retry
+        doSchedule();
+        break;
+      default:
+        break;
+    }
+
+    // Invoke taskDispatcher.onExecutorSlotAvailable()
+    switch (newState) {
+      // These three states mean that a slot is made available.
+      case COMPLETE:
+      case ON_HOLD:
+      case SHOULD_RETRY:
+        taskDispatcher.onExecutorSlotAvailable();
+        break;
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Handle metric message, as it would have been sent from org.apache.nemo.runtime.executor.MetricManagerWorker#send.
+   * @param metricType  type of metric.
+   * @param metricId    id of metric.
+   * @param metricField field of metric.
+   * @param metricValue value of metric.
+   */
+  void handleMetricMessage(final String metricType, final String metricId,
+                           final String metricField, final byte[] metricValue) {
+    final Class<Metric> metricClass = metricStore.getMetricClassByName(metricType);
+    // process metric message
+    try {
+      if (metricStore.getOrCreateMetric(metricClass, metricId).processMetricMessage(metricField, metricValue)) {
+        metricStore.triggerBroadcast(metricClass, metricId);
+      }
+    } catch (final Exception e) {
+      LOG.warn("Error when processing metric message for {}, {}, {}.", metricType, metricId, metricField);
+    }
+  }
+
+  /**
+   * The endpoint of the simulator. Collect the metric store, and terminate the simulator.
+   * @return the metrics of the simulation.
+   */
+  public MetricStore collectMetricStore() {
+    try {
+      // wait for metric flush
+      if (!metricCountDownLatch.await(10000, TimeUnit.MILLISECONDS)) {
+        LOG.warn("Terminating master before all executor terminated messages arrived.");
+      }
+    } catch (final InterruptedException e) {
+      LOG.warn("Waiting executor terminating process interrupted: ", e);
+      // clean up state...
+      Thread.currentThread().interrupt();
+    }
+
+    final MetricStore res = this.metricStore;
+    this.reset();
+    return res;
+  }
+
+  @Override
+  public void onSpeculativeExecutionCheck() {
+    // we don't simulate speculate execution yet.
+    return;
+  }
+
+  @Override
+  public void terminate() {
+    this.taskDispatcher.terminate();
+    this.executorRegistry.terminate();
+  }
+
+  /**
+   * Evaluator ActiveContext for the Simulation.
+   */
+  private static final class SimulationEvaluatorActiveContext implements ActiveContext {
+    private final Integer id;
+
+    /**
+     * Default constructor.
+     * @param id Evaluator ID.
+     */
+    SimulationEvaluatorActiveContext(final Integer id) {
+      this.id = id;
+    }
+
+    @Override
+    public void close() {
+      // do nothing
+    }
+
+    @Override
+    public void submitTask(final Configuration taskConf) {
+      // do nothing
+    }
+
+    @Override
+    public void submitContext(final Configuration contextConfiguration) {
+      // do nothing
+    }
+
+    @Override
+    public void submitContextAndService(final Configuration contextConfiguration,
+                                        final Configuration serviceConfiguration) {
+      // do nothing
+    }
+
+    @Override
+    public void sendMessage(final byte[] message) {
+      // do nothing
+    }
+
+    @Override
+    public String getEvaluatorId() {
+      return getId();
+    }
+
+    @Override
+    public Optional<String> getParentId() {
+      return null;
+    }
+
+    @Override
+    public EvaluatorDescriptor getEvaluatorDescriptor() {
+      return null;
+    }
+
+    @Override
+    public String getId() {
+      return "Evaluator" + id;
+    }
+  }
+
+  /**
+   * MessageSender for Simulations.
+   */
+  private static final class SimulationMessageSender implements MessageSender<ControlMessage.Message> {
+    private final String executorId;
+    private final SimulationScheduler scheduler;
+
+    /**
+     * Constructor for the message sender that simply passes on the messages, instead of sending actual messages.
+     * @param executorId the simulated executor id of where the message sender communicates from.
+     * @param scheduler the simulation scheduler to communicate with.
+     */
+    SimulationMessageSender(final String executorId, final SimulationScheduler scheduler) {
+      this.executorId = executorId;
+      this.scheduler = scheduler;
+    }
+
+    @Override
+    public void send(final ControlMessage.Message message) {
+      switch (message.getType()) {
+        // Messages sent to the master
+        case TaskStateChanged:
+          final ControlMessage.TaskStateChangedMsg taskStateChangedMsg = message.getTaskStateChangedMsg();
+
+          scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
+            taskStateChangedMsg.getTaskId(),
+            taskStateChangedMsg.getAttemptIdx(),
+            MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
+            taskStateChangedMsg.getVertexPutOnHoldId(),
+            MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
+          break;
+        case ExecutorFailed:
+          // Executor failed due to user code.
+          final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
+          final String failedExecutorId = executorFailedMsg.getExecutorId();
+          final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
+          LOG.error(failedExecutorId, " failed, Stack Trace: ", exception);
+          throw new SimulationException(exception);
+        case RunTimePassMessage:
+          scheduler.onRunTimePassMessage(
+            // TODO #436: Dynamic task resizing.
+            message.getRunTimePassMessageMsg().getTaskId(),
+            message.getRunTimePassMessageMsg().getEntryList());
+          break;
+        case MetricMessageReceived:
+          final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
+          metricList.forEach(metric ->
+            scheduler.handleMetricMessage(
+              metric.getMetricType(), metric.getMetricId(),
+              metric.getMetricField(), metric.getMetricValue().toByteArray()));
+          break;
+        //  Messages sent to the executor
+        case ScheduleTask:
+          final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
+          final Task task =
+            SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
+          scheduler.simulatedTaskExecutorMap.get(executorId).onTaskReceived(task);
+          break;
+        // No metric messaging in simulation.
+        case MetricFlushed:
+        case RequestMetricFlush:
+          break;
+        default:
+          throw new IllegalMessageException(
+            new Exception("This message should not be received by Master or the Executor :" + message.getType()));
+      }
+    }
+
+    @Override
+    public <U> CompletableFuture<U> request(final ControlMessage.Message message) {
+      return null;
+    }
+
+    @Override
+    public void close() {
+      // do nothing.
+    }
+  }
+
+
+  /**
+   * Scheduling policy for simulations.
+   */
+  private final class SimulationSchedulingPolicy implements SchedulingPolicy {
+    @Override
+    public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) {
+      return Collections.min(executors,
+        Comparator.comparing(e -> simulatedTaskExecutorMap.get(e.getExecutorId()).getElapsedTime().intValue()));
+    }
+  }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
index 16a6b35..e15d8ea 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -76,6 +76,24 @@ final class TaskDispatcher {
   }
 
   /**
+   * Static constructor for manual usage.
+   * @param schedulingConstraintRegistry Registry for the scheduling constraints.
+   * @param schedulingPolicy The Scheduling Policy to use.
+   * @param pendingTaskCollectionPointer A pointer to the pending tasks to be executed later on.
+   * @param executorRegistry Registry for the list of executors available.
+   * @param planStateManager Manager for the state of the plan being executed.
+   * @return a new instance of task dispatcher.
+   */
+  public static TaskDispatcher newInstance(final SchedulingConstraintRegistry schedulingConstraintRegistry,
+                                           final SchedulingPolicy schedulingPolicy,
+                                           final PendingTaskCollectionPointer pendingTaskCollectionPointer,
+                                           final ExecutorRegistry executorRegistry,
+                                           final PlanStateManager planStateManager) {
+    return new TaskDispatcher(schedulingConstraintRegistry, schedulingPolicy, pendingTaskCollectionPointer,
+      executorRegistry, planStateManager);
+  }
+
+  /**
    * A separate thread is run to dispatch tasks to executors.
    * See comments in the {@link Scheduler} for avoiding race conditions.
    */
@@ -115,6 +133,7 @@ final class TaskDispatcher {
 
       executorRegistry.viewExecutors(executors -> {
         final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
+        // Filter out the candidate executors that do not meet scheduling constraints.
         task.getExecutionProperties().forEachProperties(property -> {
           final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass());
           if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) {
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java
index d82ddd5..c55c4fc 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/ContainerManagerTest.java
@@ -23,7 +23,7 @@ import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.master.resource.ContainerManager;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.reef.driver.catalog.NodeDescriptor;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 0384932..35bb737 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -30,7 +30,7 @@ import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/SimulationSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/SimulationSchedulerTest.java
new file mode 100644
index 0000000..73d9219
--- /dev/null
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/SimulationSchedulerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.master.scheduler;
+
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.message.ClientRPC;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.metric.TaskMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PlanRewriter;
+import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests {@link SimulationScheduler}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SchedulingConstraintRegistry.class, BlockManagerMaster.class, ClientRPC.class, BatchScheduler.class})
+public final class SimulationSchedulerTest {
+  private static final Logger LOG = LoggerFactory.getLogger(SimulationSchedulerTest.class.getName());
+  private BatchScheduler batchScheduler;
+  private SimulationScheduler scheduler;
+  private static final String defaultExecutorJSONContents =
+    "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+    + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
+
+  // Assume no failures
+  private static final int SCHEDULE_ATTEMPT_INDEX = 1;
+
+  @Before
+  public void setUp() throws Exception {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileInstance(PlanRewriter.class, mock(PlanRewriter.class));
+    injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
+    final SchedulingPolicy schedulingPolicy = Tang.Factory.getTang().newInjector()
+      .getInstance(MinOccupancyFirstSchedulingPolicy.class);
+    injector.bindVolatileInstance(SchedulingPolicy.class, schedulingPolicy);
+    injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
+    injector.bindVolatileInstance(ClientRPC.class, mock(ClientRPC.class));
+    injector.bindVolatileParameter(JobConf.ExecutorJSONContents.class, defaultExecutorJSONContents);
+    injector.bindVolatileParameter(JobConf.ScheduleSerThread.class, 8);
+    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
+
+    batchScheduler = mock(BatchScheduler.class);
+    scheduler = injector.getInstance(SimulationScheduler.class);
+  }
+
+  @Test(timeout = 10000)
+  public void testSimulation() throws Exception {
+    final PhysicalPlan physicalPlan =
+      TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
+    MetricStore.getStore().getOrCreateMetric(JobMetric.class, physicalPlan.getPlanId())
+      .setStageDAG(physicalPlan.getStageDAG());
+    final PhysicalPlan physicalPlan2 =
+      TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
+    scheduler.schedulePlan(
+      physicalPlan2,
+      SCHEDULE_ATTEMPT_INDEX);
+
+    LOG.info("Wait for plan termination after sending stage completion events");
+    final MetricStore resultingMetricStore = scheduler.collectMetricStore();
+    scheduler.terminate();
+
+    resultingMetricStore.getMetricMap(TaskMetric.class).forEach((id, taskMetric) -> {
+      assertTrue(0 <= ((TaskMetric) taskMetric).getTaskDuration());
+      assertTrue(1000 > ((TaskMetric) taskMetric).getTaskDuration());
+    });
+
+    final LongStream l = resultingMetricStore.getMetricMap(JobMetric.class).values().stream().mapToLong(jobMetric ->
+      ((JobMetric) jobMetric).getJobDuration());
+    LOG.info("Simulated duration: {}ms", l.findFirst().orElse(0));
+  }
+}
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index 2ee2d2d..fbaae0b 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -37,7 +37,7 @@ import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
-import org.apache.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
 import org.junit.Before;