You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/08/21 03:23:51 UTC

[GitHub] wonook closed pull request #111: [NEMO-139, 6] Logic in the scheduler for appending jobs, Support RDD caching

wonook closed pull request #111: [NEMO-139, 6] Logic in the scheduler for appending jobs, Support RDD caching
URL: https://github.com/apache/incubator-nemo/pull/111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 595085428..68a44222d 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -88,7 +88,8 @@ public static void main(final String[] args) throws Exception {
 
     // Registers actions for launching the DAG.
     driverRPCServer
-        .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { })
+        .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> {
+        })
         .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown())
         .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown())
         .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll(
@@ -185,7 +186,9 @@ public static void launchDAG(final DAG dag) {
     jobDoneLatch = new CountDownLatch(1);
     driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
         .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
-        .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build())
+        .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder()
+            .setDag(serializedDAG)
+            .build())
         .build());
 
     // Wait for the ExecutionDone message from the driver
@@ -347,8 +350,8 @@ private static Configuration getDeployModeConf(final Configuration jobConf) thro
   /**
    * Read json file and return its contents as configuration parameter.
    *
-   * @param jobConf job configuration to get json path.
-   * @param pathParameter named parameter represents path to the json file, or an empty string
+   * @param jobConf           job configuration to get json path.
+   * @param pathParameter     named parameter represents path to the json file, or an empty string
    * @param contentsParameter named parameter represents contents of the file
    * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter}
    * @throws InjectionException exception while injection.
diff --git a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
index 835313e53..4f9212b3e 100644
--- a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
+++ b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
@@ -18,8 +18,11 @@
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -54,7 +57,10 @@ public void testState() throws Exception {
     // Create a PlanStateManager of a dag and create a DriverEndpoint with it.
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final PlanStateManager planStateManager = new PlanStateManager(physicalPlan, MAX_SCHEDULE_ATTEMPT);
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileInstance(MetricMessageHandler.class, mock(MetricMessageHandler.class));
+    final PlanStateManager planStateManager = injector.getInstance(PlanStateManager.class);
+    planStateManager.updatePlan(physicalPlan, MAX_SCHEDULE_ATTEMPT);
 
     final DriverEndpoint driverEndpoint = new DriverEndpoint(planStateManager, clientEndpoint);
 
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index 295b56ccb..7625cde10 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -57,6 +57,7 @@ public DAGBuilder() {
 
   /**
    * Constructor of DAGBuilder with a DAG to start from.
+   *
    * @param dag to start the builder from.
    */
   public DAGBuilder(final DAG<V, E> dag) {
@@ -67,6 +68,7 @@ public DAGBuilder(final DAG<V, E> dag) {
 
   /**
    * Add vertex to the builder.
+   *
    * @param v vertex to add.
    * @return the builder.
    */
@@ -76,11 +78,13 @@ public DAGBuilder(final DAG<V, E> dag) {
     outgoingEdges.putIfAbsent(v, new HashSet<>());
     return this;
   }
+
   /**
    * Add vertex to the builder, with assignedLoopVertex and stackDepth information.
-   * @param v vertex to add.
+   *
+   * @param v                  vertex to add.
    * @param assignedLoopVertex the assigned, wrapping loop vertex.
-   * @param stackDepth the stack depth of the loop vertex.
+   * @param stackDepth         the stack depth of the loop vertex.
    * @return the builder.
    */
   private DAGBuilder<V, E> addVertex(final V v, final LoopVertex assignedLoopVertex, final Integer stackDepth) {
@@ -89,9 +93,11 @@ public DAGBuilder(final DAG<V, E> dag) {
     this.loopStackDepthMap.put(v, stackDepth);
     return this;
   }
+
   /**
    * Add vertex to the builder, using the LoopVertex stack.
-   * @param v vertex to add.
+   *
+   * @param v               vertex to add.
    * @param loopVertexStack LoopVertex stack to retrieve the information from.
    * @return the builder.
    */
@@ -103,9 +109,11 @@ public DAGBuilder(final DAG<V, E> dag) {
     }
     return this;
   }
+
   /**
    * Add vertex to the builder, using the information from the given DAG.
-   * @param v vertex to add.
+   *
+   * @param v   vertex to add.
    * @param dag DAG to observe and get the LoopVertex-related information from.
    * @return the builder.
    */
@@ -120,6 +128,7 @@ public DAGBuilder(final DAG<V, E> dag) {
 
   /**
    * Remove the vertex from the list.
+   *
    * @param v vertex to remove.
    * @return the builder.
    */
@@ -134,8 +143,9 @@ public DAGBuilder(final DAG<V, E> dag) {
 
   /**
    * Connect vertices at the edge.
+   *
    * @param edge edge to add.
-   * Note: the two vertices of the edge should already be added to the DAGBuilder.
+   *             Note: the two vertices of the edge should already be added to the DAGBuilder.
    * @return the builder.
    */
   public DAGBuilder<V, E> connectVertices(final E edge) {
@@ -157,6 +167,7 @@ public DAGBuilder(final DAG<V, E> dag) {
 
   /**
    * Checks whether the DAGBuilder is empty.
+   *
    * @return whether the DAGBuilder is empty or not.
    */
   public boolean isEmpty() {
@@ -165,6 +176,7 @@ public boolean isEmpty() {
 
   /**
    * check if the DAGBuilder contains the vertex.
+   *
    * @param vertex vertex that it searches for.
    * @return whether or not the builder contains it.
    */
@@ -174,6 +186,7 @@ public boolean contains(final V vertex) {
 
   /**
    * check if the DAGBuilder contains any vertex that satisfies the predicate.
+   *
    * @param predicate predicate to test each vertices with.
    * @return whether or not the builder contains it.
    */
@@ -182,11 +195,13 @@ public boolean contains(final Predicate<V> predicate) {
   }
 
   ///////////////////////////INTEGRITY CHECK///////////////////////////
+
   /**
    * Helper method to guarantee that there are no cycles in the DAG.
-   * @param stack stack to push the vertices to.
+   *
+   * @param stack   stack to push the vertices to.
    * @param visited set to keep track of visited vertices.
-   * @param vertex vertex to check.
+   * @param vertex  vertex to check.
    */
   private void cycleCheck(final Stack<V> stack, final Set<V> visited, final V vertex) {
     // We check in a DFS manner.
@@ -252,7 +267,7 @@ private void executionPropertyCheck() {
     // DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection)
     vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
         .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
-                      .equals(e.getPropertyValue(MetricCollectionProperty.class)))
+            .equals(e.getPropertyValue(MetricCollectionProperty.class)))
         .filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
         .forEach(e -> {
           throw new CompileTimeOptimizationException("DAG execution property check: "
@@ -262,9 +277,10 @@ private void executionPropertyCheck() {
 
   /**
    * DAG integrity check function, that keeps DAG in shape.
-   * @param cycle whether or not to check for cycles.
-   * @param source whether or not to check sources.
-   * @param sink whether or not to check sink.
+   *
+   * @param cycle             whether or not to check for cycles.
+   * @param source            whether or not to check sources.
+   * @param sink              whether or not to check sink.
    * @param executionProperty whether or not to check execution property.
    */
   private void integrityCheck(final boolean cycle,
@@ -289,8 +305,10 @@ private void integrityCheck(final boolean cycle,
   }
 
   ///////////////////////////BUILD///////////////////////////
+
   /**
-   * Build the DAG for LoopVertex.
+   * Build the DAG without source and sink check.
+   *
    * @return the DAG contained by the builder.
    */
   public DAG<V, E> buildWithoutSourceSinkCheck() {
@@ -298,8 +316,19 @@ private void integrityCheck(final boolean cycle,
     return new DAG<>(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
   }
 
+  /**
+   * Build the DAG without source check.
+   *
+   * @return the DAG contained by the builder.
+   */
+  public DAG<V, E> buildWithoutSourceCheck() {
+    integrityCheck(true, false, true, true);
+    return new DAG<>(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
+  }
+
   /**
    * Build the DAG.
+   *
    * @return the DAG contained by the builder.
    */
   public DAG<V, E> build() {
diff --git a/common/src/main/java/edu/snu/nemo/common/exception/BlockFetchException.java b/common/src/main/java/edu/snu/nemo/common/exception/BlockFetchException.java
index 5f5e6ba44..3d0192ee9 100644
--- a/common/src/main/java/edu/snu/nemo/common/exception/BlockFetchException.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/BlockFetchException.java
@@ -22,9 +22,10 @@
 public final class BlockFetchException extends RuntimeException {
   /**
    * BlockFetchException.
-   * @param exception exception
+   *
+   * @param throwable the throwable to throw.
    */
-  public BlockFetchException(final Throwable exception) {
-    super(exception);
+  public BlockFetchException(final Throwable throwable) {
+    super(throwable);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/exception/BlockWriteException.java b/common/src/main/java/edu/snu/nemo/common/exception/BlockWriteException.java
index e07dece61..1ed5b3b80 100644
--- a/common/src/main/java/edu/snu/nemo/common/exception/BlockWriteException.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/BlockWriteException.java
@@ -22,9 +22,10 @@
 public final class BlockWriteException extends RuntimeException {
   /**
    * BlockWriteException.
-   * @param exception exception
+   *
+   * @param throwable the throwable to throw.
    */
-  public BlockWriteException(final Throwable exception) {
-    super(exception);
+  public BlockWriteException(final Throwable throwable) {
+    super(throwable);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/exception/CompileTimeOptimizationException.java b/common/src/main/java/edu/snu/nemo/common/exception/CompileTimeOptimizationException.java
index 9756c2875..b5e574ff4 100644
--- a/common/src/main/java/edu/snu/nemo/common/exception/CompileTimeOptimizationException.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/CompileTimeOptimizationException.java
@@ -16,12 +16,13 @@
 package edu.snu.nemo.common.exception;
 
 /**
- * DynamicOptimizationException.
- * Thrown for dynamic optimization related exceptions.
+ * CompileTimeOptimizationException.
+ * Thrown for compile-time optimization related exceptions.
  */
 public class CompileTimeOptimizationException extends RuntimeException {
   /**
-   * Constructor of DynamicOptimizationException.
+   * Constructor of CompileTimeOptimizationException.
+   *
    * @param cause cause.
    */
   public CompileTimeOptimizationException(final Throwable cause) {
@@ -29,7 +30,8 @@ public CompileTimeOptimizationException(final Throwable cause) {
   }
 
   /**
-   * Constructor of DynamicOptimizationException.
+   * Constructor of CompileTimeOptimizationException.
+   *
    * @param message message.
    */
   public CompileTimeOptimizationException(final String message) {
diff --git a/common/src/main/java/edu/snu/nemo/common/exception/DynamicOptimizationException.java b/common/src/main/java/edu/snu/nemo/common/exception/DynamicOptimizationException.java
index 0917fad2a..88fedf409 100644
--- a/common/src/main/java/edu/snu/nemo/common/exception/DynamicOptimizationException.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/DynamicOptimizationException.java
@@ -22,6 +22,7 @@
 public class DynamicOptimizationException extends RuntimeException {
   /**
    * Constructor of DynamicOptimizationException.
+   *
    * @param cause cause.
    */
   public DynamicOptimizationException(final Throwable cause) {
@@ -30,6 +31,7 @@ public DynamicOptimizationException(final Throwable cause) {
 
   /**
    * Constructor of DynamicOptimizationException.
+   *
    * @param message message.
    */
   public DynamicOptimizationException(final String message) {
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CacheIDProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CacheIDProperty.java
new file mode 100644
index 000000000..5e97fab35
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CacheIDProperty.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+import java.util.UUID;
+
+/**
+ * Cache ID ExecutionProperty. This property is used for identifying the cached data.
+ */
+public final class CacheIDProperty extends EdgeExecutionProperty<UUID> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private CacheIDProperty(final UUID value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static CacheIDProperty of(final UUID value) {
+    return new CacheIDProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
index 074f615cd..ec22a22f0 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
@@ -32,6 +32,7 @@
 
   /**
    * Constructor.
+   *
    * @param groupId Group ID.
    */
   public DuplicateEdgeGroupPropertyValue(final String groupId) {
@@ -42,6 +43,7 @@ public DuplicateEdgeGroupPropertyValue(final String groupId) {
 
   /**
    * Set physical edge id.
+   *
    * @param representativeEdgeId physical edge id of representative edge.
    */
   public void setRepresentativeEdgeId(final String representativeEdgeId) {
@@ -54,6 +56,7 @@ public void setRepresentativeEdgeId(final String representativeEdgeId) {
 
   /**
    * Set the group size.
+   *
    * @param groupSize the group size.
    */
   public void setGroupSize(final int groupSize) {
@@ -65,6 +68,7 @@ public void setGroupSize(final int groupSize) {
 
   /**
    * Get the physical edge id of the representative edge.
+   *
    * @return physical edge id of the representative edge.
    */
   public String getRepresentativeEdgeId() {
@@ -76,6 +80,7 @@ public String getRepresentativeEdgeId() {
 
   /**
    * Get the data id.
+   *
    * @return data id.
    */
   public String getGroupId() {
@@ -84,6 +89,7 @@ public String getGroupId() {
 
   /**
    * Get the group size.
+   *
    * @return the group size.
    */
   public int getGroupSize() {
@@ -93,6 +99,13 @@ public int getGroupSize() {
     return groupSize;
   }
 
+  /**
+   * @return whether the representative edge is decided or not.
+   */
+  public boolean isRepresentativeEdgeDecided() {
+    return isRepresentativeEdgeDecided;
+  }
+
   @Override
   public String toString() {
     return String.format("DuplicateEdgeGroup(%s)", representativeEdgeId != null ? representativeEdgeId : "");
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
index 04f5c072e..6c7b3c3ef 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
@@ -23,6 +23,7 @@
 public final class PartitionerProperty extends EdgeExecutionProperty<PartitionerProperty.Value> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
   private PartitionerProperty(final Value value) {
@@ -31,6 +32,7 @@ private PartitionerProperty(final Value value) {
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/CachedSourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/CachedSourceVertex.java
new file mode 100644
index 000000000..ee1af03d2
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex;
+
+import edu.snu.nemo.common.ir.Readable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Bounded source vertex for cached data.
+ * It does not have actual data but just wraps the cached input data.
+ * @param <T> the type of data to emit.
+ */
+public final class CachedSourceVertex<T> extends SourceVertex<T> {
+  private List<Readable<T>> readables;
+
+  /**
+   * Constructor.
+   *
+   * @param numPartitions the number of partitions.
+   */
+  public CachedSourceVertex(final int numPartitions) {
+    this.readables = new ArrayList<>();
+    for (int i = 0; i < numPartitions; i++) {
+      readables.add(new CachedReadable());
+    }
+  }
+
+  /**
+   * Constructor for cloning.
+   *
+   * @param readables the list of Readables to set.
+   */
+  private CachedSourceVertex(final List<Readable<T>> readables) {
+    this.readables = readables;
+  }
+
+  @Override
+  public CachedSourceVertex getClone() {
+    final CachedSourceVertex that = new CachedSourceVertex<>(this.readables);
+    this.copyExecutionPropertiesTo(that);
+    return that;
+  }
+
+  @Override
+  public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
+    // Ignore the desired number of splits.
+    return readables;
+  }
+
+  @Override
+  public void clearInternalStates() {
+    readables = null;
+  }
+
+  /**
+   * A Readable wrapper for cached data.
+   * It does not contain any actual data but the data will be sent from the cached store through external input reader.
+   */
+  private final class CachedReadable implements Readable<T> {
+
+    /**
+     * Constructor.
+     */
+    private CachedReadable() {
+      // Do nothing
+    }
+
+    @Override
+    public Iterable<T> read() throws IOException {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public List<String> getLocations() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/IgnoreSchedulingTempDataReceiverProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/IgnoreSchedulingTempDataReceiverProperty.java
new file mode 100644
index 000000000..45233bfc3
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/IgnoreSchedulingTempDataReceiverProperty.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * A property represents that a vertex annotated with this property doesn't process any data and
+ * should be regarded as a kind of "marker" to construct a temporary edge that contains some data that
+ * have to be written before it's usage is not determined (e.g., for caching).
+ * The written data in the edge toward the vertex annotated with this property will be used when
+ * the usage is determined by using {@link edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty}.
+ * In that case, the edge will be regarded as a representative edge.
+ * Attaching this property makes runtime to not schedule this vertex.
+ */
+public final class IgnoreSchedulingTempDataReceiverProperty extends VertexExecutionProperty<Boolean> {
+
+  private static final IgnoreSchedulingTempDataReceiverProperty IGNORE_SCHEDULING_TEMP_DATA_RECEIVER_PROPERTY =
+    new IgnoreSchedulingTempDataReceiverProperty();
+
+  /**
+   * Constructor.
+   */
+  private IgnoreSchedulingTempDataReceiverProperty() {
+    super(true);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @return the execution property.
+   */
+  public static IgnoreSchedulingTempDataReceiverProperty of() {
+    return IGNORE_SCHEDULING_TEMP_DATA_RECEIVER_PROPERTY;
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java
index d07731dd6..fdd089384 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java
@@ -23,6 +23,7 @@
 public final class ParallelismProperty extends VertexExecutionProperty<Integer> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
   private ParallelismProperty(final Integer value) {
@@ -31,6 +32,7 @@ private ParallelismProperty(final Integer value) {
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index a1daf8bec..9c02ffd82 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -62,6 +62,7 @@ private EmptyComponents() {
 
   /**
    * An empty transform.
+   *
    * @param <I> input type.
    * @param <O> output type.
    */
@@ -70,6 +71,7 @@ private EmptyComponents() {
 
     /**
      * Default constructor.
+     *
      * @param name name of the empty transform.
      */
     public EmptyTransform(final String name) {
@@ -100,6 +102,7 @@ public void close() {
 
   /**
    * An empty Source Vertex.
+   *
    * @param <T> type of the data.
    */
   public static final class EmptySourceVertex<T> extends SourceVertex<T> {
@@ -107,6 +110,7 @@ public void close() {
 
     /**
      * Constructor.
+     *
      * @param name name for the vertex.
      */
     public EmptySourceVertex(final String name) {
@@ -152,6 +156,7 @@ public void clearInternalStates() {
 
   /**
    * An empty reader.
+   *
    * @param <T> type of the data.
    */
   static final class EmptyReadable<T> implements Readable<T> {
@@ -159,6 +164,7 @@ public void clearInternalStates() {
     public Iterable<T> read() {
       return new ArrayList<>();
     }
+
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/Backend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/Backend.java
index 62b63cd47..d49c1b991 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/Backend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/Backend.java
@@ -18,15 +18,19 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
+import org.apache.reef.tang.annotations.DefaultImplementation;
 
 /**
  * Interface for backend components.
  * @param <Plan> the physical execution plan to compile the DAG into.
  */
+@DefaultImplementation(NemoBackend.class)
 public interface Backend<Plan> {
   /**
    * Compiles a DAG to a physical execution plan.
-   * @param dag DAG to compile.
+   *
+   * @param dag the DAG to compile.
    * @return the execution plan generated.
    * @throws Exception Exception on the way.
    */
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 696acffd3..2082f8065 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -24,7 +24,6 @@
 import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import org.apache.reef.tang.Tang;
 
 import javax.inject.Inject;
 
@@ -32,34 +31,25 @@
  * Backend component for Nemo Runtime.
  */
 public final class NemoBackend implements Backend<PhysicalPlan> {
+
+  private final PhysicalPlanGenerator physicalPlanGenerator;
+
   /**
    * Constructor.
    */
   @Inject
-  private NemoBackend() {
+  private NemoBackend(final PhysicalPlanGenerator physicalPlanGenerator) {
+    this.physicalPlanGenerator = physicalPlanGenerator;
   }
 
   /**
    * Compiles an IR DAG into a {@link PhysicalPlan} to be submitted to Runtime.
-   * @param irDAG to compile.
+   *
+   * @param irDAG the IR DAG to compile.
    * @return the execution plan to be submitted to Runtime.
-   * @throws Exception any exception occurred during the compilation.
    */
-  public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG) throws Exception {
-    final PhysicalPlanGenerator physicalPlanGenerator =
-        Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
-    return compile(irDAG, physicalPlanGenerator);
-  }
+  public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG) {
 
-  /**
-   * Compiles an IR DAG into a {@link PhysicalPlan} to be submitted to Runtime.
-   * Receives {@link PhysicalPlanGenerator} with configured directory of DAG files.
-   * @param irDAG to compile.
-   * @param physicalPlanGenerator with custom DAG directory.
-   * @return the execution plan to be submitted to Runtime.
-   */
-  public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
-                              final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
     return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), stageDAG);
   }
diff --git a/compiler/backend/src/test/java/edu/snu/nemo/compiler/backend/nemo/NemoBackendTest.java b/compiler/backend/src/test/java/edu/snu/nemo/compiler/backend/nemo/NemoBackendTest.java
index 26c00a62f..2a93cf16a 100644
--- a/compiler/backend/src/test/java/edu/snu/nemo/compiler/backend/nemo/NemoBackendTest.java
+++ b/compiler/backend/src/test/java/edu/snu/nemo/compiler/backend/nemo/NemoBackendTest.java
@@ -25,10 +25,8 @@
 import edu.snu.nemo.compiler.optimizer.policy.TransientResourcePolicy;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -44,7 +42,7 @@
   private final IRVertex groupByKey = new OperatorVertex(new EmptyComponents.EmptyTransform("GroupByKey"));
   private final IRVertex combine = new OperatorVertex(new EmptyComponents.EmptyTransform("Combine"));
   private final IRVertex map2 = new OperatorVertex(new EmptyComponents.EmptyTransform("MapElements2"));
-  private PhysicalPlanGenerator physicalPlanGenerator;
+  private NemoBackend nemoBackend;
 
   private final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
   private DAG<IRVertex, IREdge> dag;
@@ -62,17 +60,15 @@ public void setUp() throws Exception {
 
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
-    this.physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
+    this.nemoBackend = injector.getInstance(NemoBackend.class);
   }
 
   /**
    * This method uses an IR DAG and tests whether NemoBackend successfully generates an Execution Plan.
-   * @throws Exception during the Execution Plan generation.
    */
   @Test
-  public void testExecutionPlanGeneration() throws InjectionException {
-    final NemoBackend backend = Tang.Factory.getTang().newInjector().getInstance(NemoBackend.class);
-    final PhysicalPlan executionPlan = backend.compile(dag, physicalPlanGenerator);
+  public void testExecutionPlanGeneration() {
+    final PhysicalPlan executionPlan = nemoBackend.compile(dag);
 
     assertEquals(2, executionPlan.getStageDAG().getVertices().size());
     assertEquals(2, executionPlan.getStageDAG().getTopologicalSort().get(0).getIRDAG().getVertices().size());
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
index 2fad99c33..1d1b966d6 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
@@ -238,14 +238,21 @@ public void saveAsTextFile(final String path) {
     rdd.saveAsTextFile(path);
   }
 
-  /////////////// UNSUPPORTED TRANSFORMATIONS ///////////////
-  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+  /////////////// CACHING ///////////////
+
+  @Override
+  public JavaRDD<T> persist(final StorageLevel newLevel) {
+    return rdd.persist(newLevel).toJavaRDD();
+  }
 
   @Override
   public JavaRDD<T> cache() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    return rdd.cache().toJavaRDD();
   }
 
+  /////////////// UNSUPPORTED TRANSFORMATIONS ///////////////
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+
   @Override
   public JavaRDD<T> coalesce(final int numPartitions) {
     throw new UnsupportedOperationException("Operation not yet implemented.");
@@ -292,11 +299,6 @@ public void saveAsTextFile(final String path) {
     throw new UnsupportedOperationException("Operation not yet implemented.");
   }
 
-  @Override
-  public JavaRDD<T> persist(final StorageLevel newLevel) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
   @Override
   public JavaRDD<T>[] randomSplit(final double[] weights) {
     throw new UnsupportedOperationException("Operation not yet implemented.");
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index ad5839992..a3d75a548 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -16,13 +16,16 @@
 package edu.snu.nemo.compiler.frontend.spark.core.rdd
 
 import java.util
+import java.util.UUID
 
 import edu.snu.nemo.client.JobLauncher
 import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.edge.executionproperty._
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
+import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
+import edu.snu.nemo.common.test.EmptyComponents.EmptyTransform
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
 import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
@@ -35,6 +38,7 @@ import org.apache.spark.rdd.{AsyncRDDActions, DoubleRDDFunctions, OrderedRDDFunc
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{Dependency, Partition, Partitioner, SparkContext, TaskContext}
+import org.slf4j.LoggerFactory
 
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
@@ -45,9 +49,10 @@ import scala.reflect.ClassTag
 final class RDD[T: ClassTag] protected[rdd] (
     protected[rdd] val _sc: SparkContext,
     private val deps: Seq[Dependency[_]],
-    protected[rdd] val dag: DAG[IRVertex, IREdge],
+    protected[rdd] var dag: DAG[IRVertex, IREdge],
     protected[rdd] val lastVertex: IRVertex,
     private val sourceRDD: Option[org.apache.spark.rdd.RDD[T]]) extends org.apache.spark.rdd.RDD[T](_sc, deps) {
+  private val LOG = LoggerFactory.getLogger(classOf[RDD[T]].getName)
 
   protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
   private val loopVertexStack = new util.Stack[LoopVertex]
@@ -56,6 +61,7 @@ final class RDD[T: ClassTag] protected[rdd] (
   private val decoderProperty: EdgeExecutionProperty[_ <: Serializable] =
     DecoderProperty.of(new SparkDecoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
   private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
+  private var persistedMarkerVertex: Option[IRVertex] = Option.empty
 
   /**
    * Constructor without dependencies (not needed in Nemo RDD).
@@ -223,6 +229,98 @@ final class RDD[T: ClassTag] protected[rdd] (
     JobLauncher.launchDAG(builder.build)
   }
 
+  /////////////// CACHING ///////////////
+
+  /**
+   * Set this RDD's storage level to persist its values across operations after the first time
+   * it is computed. This can only be used to assign a new storage level if the RDD does not
+   * have a storage level set yet. Local checkpointing is an exception.
+   */
+  override def persist(newLevel: StorageLevel): RDD.this.type = {
+    var actualUseDisk = false
+    var actualUseMemory = false
+    var actualDeserialized = false
+
+    if (!newLevel.isValid) {
+      throw new RuntimeException("Non-valid StorageLevel: " + newLevel.toString())
+    }
+
+    // Modify un-available options to an available option
+    if (newLevel.useDisk && newLevel.useMemory) {
+      actualUseDisk = true
+      actualUseMemory = false
+      // TODO #187: Implement disk and memory persistence (Spill)
+      LOG.warn("Cannot persist data in disk and memory at the same time. The data will be persisted in disk only.")
+    } else {
+      actualUseDisk = newLevel.useDisk
+      actualUseMemory = newLevel.useMemory
+    }
+
+    if (newLevel.useOffHeap) {
+      // TODO #188: Implement off-heap memory persistence
+      LOG.warn("Cannot persist data using off-heap area. The data will be persisted in heap instead of off-heap.")
+    }
+
+    if (newLevel.deserialized && actualUseDisk) {
+      LOG.warn(
+        "Cannot persist data as deserialized form in disk. The data will be persisted in serialized form instead.")
+      actualDeserialized = false
+    } else {
+      actualDeserialized = newLevel.deserialized
+    }
+
+    if (newLevel.replication > 1) {
+      // TODO #189: Implement replication for persisted data
+      LOG.warn("Cannot persist data with replication. The data will not be replicated.")
+    }
+
+    // TODO #190: Disable changing persistence strategy after a RDD is calculated
+    val builder = new DAGBuilder[IRVertex, IREdge](dag)
+    if (persistedMarkerVertex.isDefined) {
+      builder.removeVertex(persistedMarkerVertex.get)
+    }
+
+    val cacheID = UUID.randomUUID()
+    val ghostVertex = new OperatorVertex(new EmptyTransform[T, T]("CacheMarkerTransform-" + cacheID.toString))
+    ghostVertex.setProperty(IgnoreSchedulingTempDataReceiverProperty.of())
+    builder.addVertex(ghostVertex, loopVertexStack)
+
+    val newEdge = new IREdge(CommunicationPatternProperty.Value.OneToOne, lastVertex, ghostVertex)
+    // Setup default properties
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
+    newEdge.setProperty(keyExtractorProperty)
+    // Setup cache-related properties
+    if (actualUseDisk) {
+      newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore))
+    } else if (actualDeserialized) {
+      newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore))
+    } else {
+      newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore))
+    }
+    newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep))
+    newEdge.setProperty(CacheIDProperty.of(cacheID))
+    val dupEdgeVal = new DuplicateEdgeGroupPropertyValue("CacheGroup-" + cacheID)
+    dupEdgeVal.setRepresentativeEdgeId(newEdge.getId)
+    newEdge.setProperty(DuplicateEdgeGroupProperty.of(dupEdgeVal))
+    builder.connectVertices(newEdge)
+
+    dag = builder.buildWithoutSourceSinkCheck()
+    persistedMarkerVertex = Option.apply(ghostVertex)
+    this
+  }
+
+  /**
+   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
+   */
+  override def persist(): RDD.this.type = persist(StorageLevel.MEMORY_ONLY)
+
+  /**
+   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
+   */
+  override def cache(): RDD.this.type = persist()
+
+
   /////////////// UNSUPPORTED METHODS ///////////////
   //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
 
@@ -237,15 +335,6 @@ final class RDD[T: ClassTag] protected[rdd] (
   override def setName(_name: String): RDD.this.type =
     throw new UnsupportedOperationException("Operation not yet implemented.")
 
-  override def persist(newLevel: StorageLevel): RDD.this.type =
-    throw new UnsupportedOperationException("Operation not yet implemented.")
-
-  override def persist(): RDD.this.type =
-    throw new UnsupportedOperationException("Operation not yet implemented.")
-
-  override def cache(): RDD.this.type =
-    throw new UnsupportedOperationException("Operation not yet implemented.")
-
   override def unpersist(blocking: Boolean): RDD.this.type =
     throw new UnsupportedOperationException("Operation not yet implemented.")
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/NemoOptimizer.java
new file mode 100644
index 000000000..ba4e11c0f
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/NemoOptimizer.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
+import edu.snu.nemo.common.exception.DynamicOptimizationException;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.CachedSourceVertex;
+import edu.snu.nemo.common.ir.edge.executionproperty.CacheIDProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.conf.JobConf;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * An interface for optimizer, which manages the optimization over submitted IR DAGs through {@link Policy}s.
+ * The instance of this class will reside in driver.
+ */
+@NotThreadSafe
+public final class NemoOptimizer implements Optimizer {
+
+  private final String dagDirectory;
+  private final String optimizationPolicyCanonicalName;
+  private final Injector injector;
+  private final PubSubEventHandlerWrapper pubSubWrapper;
+  private final Map<UUID, Integer> cacheIdToParallelism = new HashMap<>();
+  private int irDagCount = 0;
+
+  @Inject
+  private NemoOptimizer(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
+                        @Parameter(JobConf.OptimizationPolicy.class) final String optimizationPolicy,
+                        final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
+                        final Injector injector) {
+    this.dagDirectory = dagDirectory;
+    this.optimizationPolicyCanonicalName = optimizationPolicy;
+    this.injector = injector;
+    this.pubSubWrapper = pubSubEventHandlerWrapper;
+  }
+
+  /**
+   * Optimize the submitted DAG.
+   *
+   * @param dag the input DAG to optimize.
+   * @return optimized DAG, reshaped or tagged with execution properties.
+   */
+  @Override
+  public DAG<IRVertex, IREdge> optimizeDag(final DAG<IRVertex, IREdge> dag) {
+    final String irDagId = "ir-" + irDagCount++ + "-";
+    dag.storeJSON(dagDirectory, irDagId, "IR before optimization");
+
+    final DAG<IRVertex, IREdge> optimizedDAG;
+    final Policy optimizationPolicy;
+    final Map<UUID, IREdge> cacheIdToEdge = new HashMap<>();
+
+    try {
+      // Handle caching first.
+      final DAG<IRVertex, IREdge> cacheFilteredDag = handleCaching(dag, cacheIdToEdge);
+      if (!cacheIdToEdge.isEmpty()) {
+        cacheFilteredDag.storeJSON(dagDirectory, irDagId + "FilterCache",
+            "IR after cache filtering");
+      }
+
+      // Conduct compile-time optimization.
+      optimizationPolicy = (Policy) Class.forName(optimizationPolicyCanonicalName).newInstance();
+
+      if (optimizationPolicy == null) {
+        throw new CompileTimeOptimizationException("A policy name should be specified.");
+      }
+
+      optimizedDAG = optimizationPolicy.runCompileTimeOptimization(cacheFilteredDag, dagDirectory);
+      optimizedDAG.storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
+          "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
+    } catch (final Exception e) {
+      throw new CompileTimeOptimizationException(e);
+    }
+
+    // Register run-time optimization.
+    try {
+      optimizationPolicy.registerRunTimeOptimizations(injector, pubSubWrapper);
+    } catch (final Exception e) {
+      throw new DynamicOptimizationException(e);
+    }
+
+    // Update cached list.
+    // TODO #191: Report the actual state of cached data to optimizer.
+    // Now we assume that the optimized dag always run properly.
+    cacheIdToEdge.forEach((cacheId, edge) -> {
+      if (!cacheIdToParallelism.containsKey(cacheId)) {
+        cacheIdToParallelism.put(
+            cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
+                .orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
+      }
+    });
+
+    // Return optimized dag
+    return optimizedDAG;
+  }
+
+  /**
+   * Handle data caching.
+   * At first, it search the edges having cache ID from the given dag and update them to the given map.
+   * Then, if some edge of a submitted dag is annotated as "cached" and the data was produced already,
+   * the part of the submitted dag which produces the cached data will be cropped and the last vertex
+   * before the cached edge will be replaced with a cached data source vertex.
+   * This cached edge will be detected and appended to the original dag in scheduler.
+   *
+   * @param dag           the dag to handle.
+   * @param cacheIdToEdge the map from cache ID to edge to update.
+   * @return the cropped dag regarding to caching.
+   */
+  private DAG<IRVertex, IREdge> handleCaching(final DAG<IRVertex, IREdge> dag,
+                                              final Map<UUID, IREdge> cacheIdToEdge) {
+    dag.topologicalDo(irVertex ->
+        dag.getIncomingEdgesOf(irVertex).forEach(
+            edge -> edge.getPropertyValue(CacheIDProperty.class).
+                ifPresent(cacheId -> cacheIdToEdge.put(cacheId, edge))
+        ));
+
+    if (cacheIdToEdge.isEmpty()) {
+      return dag;
+    } else {
+      final DAGBuilder<IRVertex, IREdge> filteredDagBuilder = new DAGBuilder<>();
+      final List<IRVertex> sinkVertices = dag.getVertices().stream()
+          .filter(irVertex -> dag.getOutgoingEdgesOf(irVertex).isEmpty())
+          .collect(Collectors.toList());
+      sinkVertices.forEach(filteredDagBuilder::addVertex); // Sink vertex cannot be cached already.
+
+      sinkVertices.forEach(sinkVtx -> addNonCachedVerticesAndEdges(dag, sinkVtx, filteredDagBuilder));
+
+      return filteredDagBuilder.buildWithoutSourceCheck();
+    }
+  }
+
+  /**
+   * Recursively add vertices and edges after cached edges to the dag builder in reversed order.
+   *
+   * @param dag      the original dag to filter.
+   * @param irVertex the ir vertex to consider to add.
+   * @param builder  the filtered dag builder.
+   */
+  private void addNonCachedVerticesAndEdges(final DAG<IRVertex, IREdge> dag,
+                                            final IRVertex irVertex,
+                                            final DAGBuilder<IRVertex, IREdge> builder) {
+    if (irVertex.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)
+        && dag.getIncomingEdgesOf(irVertex).stream()
+        .filter(irEdge -> irEdge.getPropertyValue(CacheIDProperty.class).isPresent())
+        .anyMatch(irEdge -> cacheIdToParallelism
+            .containsKey(irEdge.getPropertyValue(CacheIDProperty.class).get()))) {
+      builder.removeVertex(irVertex); // Ignore ghost vertex which was cached once.
+      return;
+    }
+
+    dag.getIncomingEdgesOf(irVertex).stream()
+        .forEach(edge -> {
+      final Optional<UUID> cacheId = dag.getOutgoingEdgesOf(edge.getSrc()).stream()
+          .filter(edgeToFilter -> edgeToFilter.getPropertyValue(CacheIDProperty.class).isPresent())
+          .map(edgeToMap -> edgeToMap.getPropertyValue(CacheIDProperty.class).get())
+          .findFirst();
+      if (cacheId.isPresent() && cacheIdToParallelism.get(cacheId.get()) != null) { // Cached already.
+        // Replace the vertex emitting cached edge with a cached source vertex.
+        final IRVertex cachedDataRelayVertex = new CachedSourceVertex(cacheIdToParallelism.get(cacheId.get()));
+        cachedDataRelayVertex.setPropertyPermanently(ParallelismProperty.of(cacheIdToParallelism.get(cacheId.get())));
+
+        builder.addVertex(cachedDataRelayVertex);
+        final IREdge newEdge = new IREdge(
+            edge.getPropertyValue(CommunicationPatternProperty.class)
+                .orElseThrow(() -> new RuntimeException("No communication pattern on an ir edge")),
+            cachedDataRelayVertex,
+            irVertex,
+            edge.isSideInput());
+        edge.copyExecutionPropertiesTo(newEdge);
+        newEdge.setProperty(CacheIDProperty.of(cacheId.get()));
+        builder.connectVertices(newEdge);
+        // Stop the recursion for this vertex.
+      } else {
+        final IRVertex srcVtx = edge.getSrc();
+        builder.addVertex(srcVtx);
+        builder.connectVertices(edge);
+        addNonCachedVerticesAndEdges(dag, srcVtx, builder);
+      }
+    });
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/Optimizer.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/Optimizer.java
new file mode 100644
index 000000000..d38865e61
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/Optimizer.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * An interface for optimizer, which manages the optimization over submitted IR DAGs through
+ * {@link edu.snu.nemo.compiler.optimizer.policy.Policy}s.
+ */
+@DefaultImplementation(NemoOptimizer.class)
+public interface Optimizer {
+
+  /**
+   * Optimize the submitted DAG.
+   *
+   * @param dag the input DAG to optimize.
+   * @return optimized DAG, reshaped or tagged with execution properties.
+   */
+  DAG<IRVertex, IREdge> optimizeDag(DAG<IRVertex, IREdge> dag);
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 31999c8fc..192ba2651 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -39,8 +39,7 @@ public BasicPullPolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 5e401442f..c0c52ffa7 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -41,8 +41,7 @@ public BasicPushPolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
index 807dd5747..492d6d7ee 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
@@ -56,8 +56,7 @@ private static int getMaxParallelism(final DAG<IRVertex, IREdge> dag) {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index acb8d50e5..e8e7a7cba 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -45,8 +45,7 @@ public DataSkewPolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
index a84ac3557..4134f6e9e 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
@@ -39,8 +39,7 @@ public DefaultPolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index 39b658b2e..d754bd065 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -47,8 +47,7 @@ public DefaultPolicyWithSeparatePass() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
index 0d934ca74..dca9b615c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
@@ -43,8 +43,7 @@ public DisaggregationPolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
index 57423b813..56cb24f98 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
@@ -43,8 +43,7 @@ public LargeShufflePolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
index 60d897bb9..b025246f8 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
@@ -31,17 +31,18 @@
 public interface Policy extends Serializable {
   /**
    * Optimize the DAG with the compile time optimizations.
-   * @param dag input DAG.
+   *
+   * @param dag          input DAG.
    * @param dagDirectory directory to save the DAG information.
    * @return optimized DAG, reshaped or tagged with execution properties.
-   * @throws Exception throws an exception if there is an exception.
    */
-  DAG<IRVertex, IREdge> runCompileTimeOptimization(DAG<IRVertex, IREdge> dag, String dagDirectory) throws Exception;
+  DAG<IRVertex, IREdge> runCompileTimeOptimization(DAG<IRVertex, IREdge> dag, String dagDirectory);
 
   /**
    * Register runtime optimizations to the event handler.
-   * @param injector Tang Injector, used in the UserApplicationRunner.
-   * @param pubSubWrapper pub-sub event handler, used in the UserApplicationRunner.
+   *
+   * @param injector      Tang Injector which contains the implementations of run-time event handlers.
+   * @param pubSubWrapper pub-sub event handler which managing run-time and compile-time event handling.
    */
   void registerRunTimeOptimizations(Injector injector, PubSubEventHandlerWrapper pubSubWrapper);
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImpl.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImpl.java
index 99dc4c4dc..72c27f132 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImpl.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImpl.java
@@ -52,8 +52,7 @@ public PolicyImpl(final List<CompileTimePass> compileTimePasses, final List<Runt
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     LOG.info("Launch Compile-time optimizations");
     return process(dag, compileTimePasses.iterator(), dagDirectory);
   }
@@ -68,7 +67,7 @@ public PolicyImpl(final List<CompileTimePass> compileTimePasses, final List<Runt
    */
   private static DAG<IRVertex, IREdge> process(final DAG<IRVertex, IREdge> dag,
                                                final Iterator<CompileTimePass> passes,
-                                               final String dagDirectory) throws Exception {
+                                               final String dagDirectory) {
     if (passes.hasNext()) {
       final CompileTimePass passToApply = passes.next();
       final DAG<IRVertex, IREdge> processedDAG;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
index 4528f7070..56325a109 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
@@ -43,8 +43,7 @@ public TransientResourcePolicy() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/compiler/test/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TestPolicy.java b/compiler/test/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TestPolicy.java
index 061d431aa..debc09eb3 100644
--- a/compiler/test/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TestPolicy.java
+++ b/compiler/test/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TestPolicy.java
@@ -48,8 +48,7 @@ public TestPolicy(final boolean testPushPolicy) {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
index 937e9516b..b0e634833 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
@@ -40,8 +40,7 @@ public ClonedSchedulingPolicyParallelismFive() {
         DefaultPolicy.BUILDER.getRuntimePasses());
   }
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
   @Override
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
index e7fa7bd1b..54d0950d8 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
@@ -37,8 +37,7 @@ public DataSkewPolicyParallelismFive() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DefaultPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DefaultPolicyParallelismFive.java
index 0287e1ce9..5a82b44df 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DefaultPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DefaultPolicyParallelismFive.java
@@ -37,8 +37,7 @@ public DefaultPolicyParallelismFive() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DisaggregationPolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DisaggregationPolicyParallelismFive.java
index 718ea9a67..13ee36fa7 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DisaggregationPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DisaggregationPolicyParallelismFive.java
@@ -38,8 +38,7 @@ public DisaggregationPolicyParallelismFive() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/LargeShufflePolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/LargeShufflePolicyParallelismFive.java
index 1c949eee9..056cbdf82 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/LargeShufflePolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/LargeShufflePolicyParallelismFive.java
@@ -37,8 +37,7 @@ public LargeShufflePolicyParallelismFive() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismFive.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismFive.java
index fbf95d08b..15f9f05b2 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismFive.java
@@ -38,8 +38,7 @@ public TransientResourcePolicyParallelismFive() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismTen.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismTen.java
index fa617223d..5272589ea 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismTen.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/TransientResourcePolicyParallelismTen.java
@@ -38,8 +38,7 @@ public TransientResourcePolicyParallelismTen() {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
-      throws Exception {
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
     return this.policy.runCompileTimeOptimization(dag, dagDirectory);
   }
 
diff --git a/examples/resources/expected_output_reversed_wordcount_spark b/examples/resources/expected_output_reversed_wordcount_spark
new file mode 100644
index 000000000..489fe3658
--- /dev/null
+++ b/examples/resources/expected_output_reversed_wordcount_spark
@@ -0,0 +1,5 @@
+1: one, girl, shakespeare, sanha
+2: bicycle, two, john, jangho, snowboard
+3: banana, three, tennis, piano
+4: jy
+5: ski, wonook
diff --git a/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkCachingWordCount.scala b/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkCachingWordCount.scala
new file mode 100644
index 000000000..1ec932cfc
--- /dev/null
+++ b/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkCachingWordCount.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package edu.snu.nemo.examples.spark
+
+import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession
+
+/**
+  * Computes counts of each data key.
+  */
+object SparkCachingWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println("Usage: SparkWordCount <input_file> <output_file1> <output_file2>")
+      System.exit(1)
+    }
+    val spark = SparkSession
+      .builder
+      .appName("Spark Word Count")
+      .getOrCreate()
+
+    val lines = spark.read().textFile(args(0)).rdd()
+
+    val words = lines.flatMap(s => s.split(" +"))
+
+    val ones = words.map(s => (s, 1))
+
+    val counts = ones.reduceByKey((i1, i2) => i1 + i2)
+
+    val cached = counts.cache()
+
+    val parsed = cached.map(tuple => tuple._1 + ": " + tuple._2.toString)
+
+    // first collect
+    val writeMode1 = args(1) != null // write to file or print
+    if (writeMode1) { // print to output file
+      parsed.saveAsTextFile(args(1))
+    } else { // print to console.
+      val output = parsed.collect()
+      for (elem <- output) {
+        println(elem)
+      }
+    }
+
+    val reversed = cached.map(p => (p._2, p._1))
+
+    val reversedVals = reversed.reduceByKey((string1, string2) => string1 + ", " + string2)
+
+    val parsed2 = reversedVals.map(tuple => tuple._1 + ": " + tuple._2.toString)
+
+    // second collect
+    val writeMode2 = args(2) != null // write to file or print
+    if (writeMode2) { // print to output file
+      parsed2.saveAsTextFile(args(2))
+    } else { // print to console.
+      val output = parsed2.collect()
+      for (elem <- output) {
+        println(elem)
+      }
+    }
+
+    spark.stop()
+  }
+}
+// scalastyle:on println
diff --git a/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala b/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala
index df64bc8ac..8919c4263 100644
--- a/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala
+++ b/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala
@@ -26,7 +26,7 @@ import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession
 object SparkWordCount {
   def main(args: Array[String]) {
     if (args.length < 1) {
-      System.err.println("Usage: JavaWordCount <input_file> [<output_file>]")
+      System.err.println("Usage: SparkWordCount <input_file> [<output_file>]")
       System.exit(1)
     }
     val spark = SparkSession
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
index 3e1e8e9f0..65e3bf65d 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
@@ -77,4 +77,31 @@ public void testWordCount() throws Exception {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
   }
+
+  @Test(timeout = TIMEOUT)
+  public void testCachingWordCount() throws Exception {
+    final String inputFileName = "test_input_wordcount_spark";
+    final String outputFileName1 = "test_output_wordcount_spark";
+    final String outputFileName2 = "test_output_reversed_wordcount_spark";
+    final String expectedOutputFilename1 = "expected_output_wordcount_spark";
+    final String expectedOutputFilename2 = "expected_output_reversed_wordcount_spark";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath1 = fileBasePath + outputFileName1;
+    final String outputFilePath2 = fileBasePath + outputFileName2;
+
+    JobLauncher.main(builder
+        .addJobId(SparkCachingWordCount.class.getSimpleName() + "_test")
+        .addUserMain(SparkCachingWordCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath1, outputFilePath2)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName1, expectedOutputFilename1);
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName2, expectedOutputFilename2);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName1);
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName2);
+    }
+  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java
index 15d34c14f..03ee40287 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java
@@ -47,18 +47,9 @@ public static String generatePhysicalPlanId() {
     return "Plan" + physicalPlanIdGenerator.getAndIncrement();
   }
 
-  /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.StageEdge}.
-   *
-   * @param irEdgeId .
-   * @return the generated ID
-   */
-  public static String generateStageEdgeId(final String irEdgeId) {
-    return "SEdge" + irEdgeId;
-  }
-
   /**
    * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.Stage}.
+   *
    * @param stageId stage ID in numeric form.
    * @return the generated ID
    */
@@ -93,25 +84,25 @@ public static String generateExecutorId() {
   /**
    * Generates the ID for a block, whose data is the output of a task attempt.
    *
-   * @param runtimeEdgeId of the block
+   * @param runtimeEdgeId  of the block
    * @param producerTaskId of the block
    * @return the generated ID
    */
   public static String generateBlockId(final String runtimeEdgeId,
                                        final String producerTaskId) {
     return runtimeEdgeId + SPLITTER + getIndexFromTaskId(producerTaskId)
-        + SPLITTER + getAttemptFromTaskId(producerTaskId);
+      + SPLITTER + getAttemptFromTaskId(producerTaskId);
   }
 
   /**
    * The block ID wildcard indicates to use 'ANY' of the available blocks produced by different task attempts.
    * (Notice that a task clone or a task retry leads to a new task attempt)
-   *
+   * <p>
    * Wildcard block id looks like SEdge4-1-* (task index = 1), where the '*' matches with any task attempts.
    * For this example, the ids of the producer task attempts will look like [Stage1-1-0, Stage1-1-1, Stage1-1-2, ...],
    * with the (1) task stage id corresponding to the outgoing edge, (2) task index = 1, and (3) all task attempts.
    *
-   * @param runtimeEdgeId of the block
+   * @param runtimeEdgeId     of the block
    * @param producerTaskIndex of the block
    * @return the generated WILDCARD ID
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/PlanAppenderException.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/PlanAppenderException.java
new file mode 100644
index 000000000..6ee98aa35
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/PlanAppenderException.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.exception;
+
+/**
+ * An exception which represents exception during appending plans.
+ */
+public final class PlanAppenderException extends RuntimeException {
+
+  /**
+   * Constructor with throwable.
+   *
+   * @param throwable the throwable to throw.
+   */
+  public PlanAppenderException(final Throwable throwable) {
+    super(throwable);
+  }
+
+  /**
+   * Constructor with String.
+   *
+   * @param message the exception message.
+   */
+  public PlanAppenderException(final String message) {
+    super(message);
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
index ffedf21cd..f9957a1f8 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
@@ -35,7 +35,7 @@
   private JsonNode stageDagJson;
 
   public JobMetric(final PhysicalPlan physicalPlan) {
-    this.id = physicalPlan.getId();
+    this.id = physicalPlan.getPlanId();
   }
 
   public JobMetric(final String id) {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index 9611326a1..0d4cfa5e3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -34,6 +34,7 @@ private RunTimeOptimizer() {
 
   /**
    * Dynamic optimization method to process the dag with an appropriate pass, decided by the stats.
+   *
    * @param originalPlan original physical execution plan.
    * @return the newly updated optimized physical plan.
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 572cbd167..f277af1ac 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -74,7 +74,8 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan,
                             final Pair<StageEdge, Map<Integer, Long>> metricData) {
     final StageEdge targetEdge = metricData.left();
     // Get number of evaluators of the next stage (number of blocks).
-    final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
+    final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).
+        orElseThrow(() -> new RuntimeException("No parallelism on a vertex"));
 
     // Calculate keyRanges.
     final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism);
@@ -94,7 +95,7 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan,
       }
     }
 
-    return new PhysicalPlan(originalPlan.getId(), stageDAG);
+    return new PhysicalPlan(originalPlan.getPlanId(), stageDAG);
   }
 
   public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap) {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 30f71110d..0a8bf5f01 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -33,8 +33,8 @@
   /**
    * Constructor.
    *
-   * @param id              ID of the plan.
-   * @param stageDAG        the DAG of stages.
+   * @param id       the ID of the plan.
+   * @param stageDAG the DAG of stages.
    */
   public PhysicalPlan(final String id,
                       final DAG<Stage, StageEdge> stageDAG) {
@@ -50,9 +50,9 @@ public PhysicalPlan(final String id,
   }
 
   /**
-   * @return id of the plan.
+   * @return the ID of the plan.
    */
-  public String getId() {
+  public String getPlanId() {
     return id;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 57c89a11f..598b25320 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -107,14 +107,19 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     }));
 
     edgeGroupToIrEdge.forEach((id, edges) -> {
-      final StageEdge representativeEdge = edges.get(0);
-      final DuplicateEdgeGroupPropertyValue representativeProperty =
-          representativeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
+      final StageEdge firstEdge = edges.get(0);
+      final DuplicateEdgeGroupPropertyValue firstDuplicateEdgeValue =
+          firstEdge.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
+
       edges.forEach(e -> {
         final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty =
             e.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
-        duplicateEdgeGroupProperty.setRepresentativeEdgeId(representativeEdge.getId());
-        duplicateEdgeGroupProperty.setGroupSize(representativeProperty.getGroupSize());
+        if (firstDuplicateEdgeValue.isRepresentativeEdgeDecided()) {
+          duplicateEdgeGroupProperty.setRepresentativeEdgeId(firstDuplicateEdgeValue.getRepresentativeEdgeId());
+        } else {
+          duplicateEdgeGroupProperty.setRepresentativeEdgeId(firstEdge.getId());
+        }
+        duplicateEdgeGroupProperty.setGroupSize(edges.size());
       });
     });
   }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index e4bd7eb05..01a548abe 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -28,7 +28,7 @@
  * @param <V> the vertex type.
  */
 public class RuntimeEdge<V extends Vertex> extends Edge<V> {
-  private final ExecutionPropertyMap executionProperties;
+  private final ExecutionPropertyMap<EdgeExecutionProperty> executionProperties;
   private final Boolean isSideInput;
 
   /**
@@ -41,7 +41,7 @@
    * @param isSideInput    Whether or not the RuntimeEdge is a side input edge.
    */
   public RuntimeEdge(final String runtimeEdgeId,
-                     final ExecutionPropertyMap executionProperties,
+                     final ExecutionPropertyMap<EdgeExecutionProperty> executionProperties,
                      final V src,
                      final V dst,
                      final Boolean isSideInput) {
@@ -65,7 +65,7 @@ public RuntimeEdge(final String runtimeEdgeId,
   /**
    * @return the ExecutionPropertyMap of the Runtime Edge.
    */
-  public final ExecutionPropertyMap getExecutionProperties() {
+  public final ExecutionPropertyMap<EdgeExecutionProperty> getExecutionProperties() {
     return executionProperties;
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index 88ef43264..2fd5c8b36 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -43,6 +43,7 @@
 @ThreadSafe
 public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, Map<IRVertex, Integer>> {
   private final Set<Class<? extends VertexExecutionProperty>> ignoredPropertyKeys = ConcurrentHashMap.newKeySet();
+  private final MutableInt nextStageIndex = new MutableInt(0);
 
   @Inject
   private StagePartitioner() {
@@ -65,7 +66,6 @@ public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty>
    */
   @Override
   public Map<IRVertex, Integer> apply(final DAG<IRVertex, IREdge> irDAG) {
-    final MutableInt nextStageIndex = new MutableInt(0);
     final Map<IRVertex, Integer> vertexToStageIdMap = new HashMap<>();
     irDAG.topologicalDo(irVertex -> {
       // Base case: for root vertices
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 9677cac30..b36fae6f6 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -168,9 +168,11 @@ public void onNext(final ActiveContext activeContext) {
   }
 
   /**
-   * Start user DAG.
+   * Start to schedule a submitted user DAG.
+   *
+   * @param dagString  the serialized DAG to schedule.
    */
-  public void startSchedulingUserDAG(final String dagString) {
+  private void startSchedulingUserDAG(final String dagString) {
     runnerThread.execute(() -> {
       userApplicationRunner.run(dagString);
       // send driver notification that user application is done.
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 6863b6f17..5fcbc11a9 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -17,19 +17,15 @@
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.backend.Backend;
-import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
-import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.compiler.optimizer.Optimizer;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,31 +40,21 @@
 public final class UserApplicationRunner {
   private static final Logger LOG = LoggerFactory.getLogger(UserApplicationRunner.class.getName());
 
-  private final String dagDirectory;
-  private final String optimizationPolicyCanonicalName;
   private final int maxScheduleAttempt;
 
-  private final Injector injector;
   private final RuntimeMaster runtimeMaster;
+  private final Optimizer optimizer;
   private final Backend<PhysicalPlan> backend;
 
-  private final PubSubEventHandlerWrapper pubSubWrapper;
-
   @Inject
-  private UserApplicationRunner(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
-                                @Parameter(JobConf.OptimizationPolicy.class) final String optimizationPolicy,
-                                @Parameter(JobConf.MaxTaskAttempt.class) final int maxScheduleAttempt,
-                                final NemoBackend backend,
-                                final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
-                                final Injector injector,
+  private UserApplicationRunner(@Parameter(JobConf.MaxTaskAttempt.class) final int maxScheduleAttempt,
+                                final Optimizer optimizer,
+                                final Backend<PhysicalPlan> backend,
                                 final RuntimeMaster runtimeMaster) {
-    this.dagDirectory = dagDirectory;
-    this.optimizationPolicyCanonicalName = optimizationPolicy;
     this.maxScheduleAttempt = maxScheduleAttempt;
-    this.injector = injector;
     this.runtimeMaster = runtimeMaster;
+    this.optimizer = optimizer;
     this.backend = backend;
-    this.pubSubWrapper = pubSubEventHandlerWrapper;
   }
 
   /**
@@ -78,29 +64,16 @@ private UserApplicationRunner(@Parameter(JobConf.DAGDirectory.class) final Strin
    *
    * @param dagString Serialized IR DAG from Nemo Client.
    */
-  public void run(final String dagString) {
+  public synchronized void run(final String dagString) {
     try {
       LOG.info("##### Nemo Compiler Start #####");
 
       final DAG<IRVertex, IREdge> dag = SerializationUtils.deserialize(Base64.getDecoder().decode(dagString));
-      dag.storeJSON(dagDirectory, "ir", "IR before optimization");
-      final Policy optimizationPolicy = (Policy) Class.forName(optimizationPolicyCanonicalName).newInstance();
-
-      if (optimizationPolicy == null) {
-        throw new CompileTimeOptimizationException("A policy name should be specified.");
-      }
-      final DAG<IRVertex, IREdge> optimizedDAG = optimizationPolicy.runCompileTimeOptimization(dag, dagDirectory);
-      optimizedDAG.storeJSON(dagDirectory, "ir-" + optimizationPolicy.getClass().getSimpleName(),
-          "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
-
-      optimizationPolicy.registerRunTimeOptimizations(injector, pubSubWrapper);
-
+      final DAG<IRVertex, IREdge> optimizedDAG = optimizer.optimizeDag(dag);
       final PhysicalPlan physicalPlan = backend.compile(optimizedDAG);
 
       LOG.info("##### Nemo Compiler Finish #####");
 
-      physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical execution plan by compiler");
-
       // Execute!
       final Pair<PlanStateManager, ScheduledExecutorService> executionResult =
           runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
@@ -108,11 +81,14 @@ public void run(final String dagString) {
       // Wait for the job to finish and stop logging
       final PlanStateManager planStateManager = executionResult.left();
       final ScheduledExecutorService dagLoggingExecutor = executionResult.right();
-      planStateManager.waitUntilFinish();
-      dagLoggingExecutor.shutdown();
+      try {
+        planStateManager.waitUntilFinish();
+        dagLoggingExecutor.shutdown();
+      } finally {
+        planStateManager.storeJSON("final");
+      }
 
-      planStateManager.storeJSON(dagDirectory, "final");
-      LOG.info("{} is complete!", physicalPlan.getId());
+      LOG.info("{} is complete!", physicalPlan.getPlanId());
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index f2b65524a..3b8b364f4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -76,9 +76,10 @@
 
   /**
    * Constructor.
-   * @param task Task with information needed during execution.
-   * @param irVertexDag A DAG of vertices.
-   * @param taskStateManager State manager for this Task.
+   *
+   * @param task                Task with information needed during execution.
+   * @param irVertexDag         A DAG of vertices.
+   * @param taskStateManager    State manager for this Task.
    * @param dataTransferFactory For reading from/writing to data to other tasks.
    * @param metricMessageSender For sending metric with execution stats to Master.
    */
@@ -112,21 +113,21 @@ public TaskExecutor(final Task task,
   /**
    * Converts the DAG of vertices into pointer-based DAG of vertex harnesses.
    * This conversion is necessary for constructing concrete data channels for each vertex's inputs and outputs.
-   *
+   * <p>
    * - Source vertex read: Explicitly handled (SourceVertexDataFetcher)
    * - Sink vertex write: Implicitly handled within the vertex
-   *
+   * <p>
    * - Parent-task read: Explicitly handled (ParentTaskDataFetcher)
    * - Children-task write: Explicitly handled (VertexHarness)
-   *
+   * <p>
    * - Intra-task read: Implicitly handled when performing Intra-task writes
    * - Intra-task write: Explicitly handled (VertexHarness)
-
+   * <p>
    * For element-wise data processing, we traverse vertex harnesses from the roots to the leaves for each element.
    * This means that overheads associated with jumping from one harness to the other should be minimal.
    * For example, we should never perform an expensive hash operation to traverse the harnesses.
    *
-   * @param task task.
+   * @param task        task.
    * @param irVertexDag dag.
    * @return fetchers and harnesses.
    */
@@ -189,10 +190,9 @@ public TaskExecutor(final Task task,
       }
       final List<InputReader> parentTaskReaders =
           getParentTaskReaders(taskIndex, irVertex, task.getTaskIncomingEdges(), dataTransferFactory);
-      parentTaskReaders.forEach(parentTaskReader -> {
+      parentTaskReaders.forEach(parentTaskReader ->
         dataFetcherList.add(new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-            vertexHarness, isToSideInput)); // Parent-task read
-      });
+            vertexHarness, isToSideInput))); // Parent-task read
     });
 
     final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -205,8 +205,9 @@ public TaskExecutor(final Task task,
 
   /**
    * Recursively process a data element down the DAG dependency.
+   *
    * @param vertexHarness VertexHarness of a vertex to execute.
-   * @param dataElement input data element to process.
+   * @param dataElement   input data element to process.
    */
   private void processElementRecursively(final VertexHarness vertexHarness, final Object dataElement) {
     final IRVertex irVertex = vertexHarness.getIRVertex();
@@ -340,11 +341,9 @@ private void finalizeVertex(final VertexHarness vertexHarness) {
 
   private void handleMainOutputElement(final VertexHarness harness, final Object element) {
     // writes to children tasks
-    harness.getWritersToMainChildrenTasks().forEach(outputWriter -> {
-      outputWriter.write(element);
-    });
+    harness.getWritersToMainChildrenTasks().forEach(outputWriter -> outputWriter.write(element));
     // writes to side input children tasks
-    if (harness.getSideInputChildren().size() > 0) {
+    if (!harness.getSideInputChildren().isEmpty()) {
       sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element);
     }
     // process elements in the next vertices within a task
@@ -355,11 +354,9 @@ private void handleAdditionalOutputElement(final VertexHarness harness, final Ob
     // writes to additional children tasks
     harness.getWritersToAdditionalChildrenTasks().entrySet().stream()
         .filter(kv -> kv.getKey().equals(tag))
-        .forEach(kv -> {
-          kv.getValue().write(element);
-        });
+        .forEach(kv -> kv.getValue().write(element));
     // writes to side input children tasks
-    if (harness.getSideInputChildren().size() > 0) {
+    if (!harness.getSideInputChildren().isEmpty()) {
       sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element);
     }
     // process elements in the next vertices within a task
@@ -397,7 +394,7 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
           // IOException means that this task should be retried.
           taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
               Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
-          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e.toString());
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
           return false;
         }
 
@@ -494,6 +491,7 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
 
   /**
    * Return inter-task OutputWriters, for single output or output associated with main tag.
+   *
    * @param irVertex                source irVertex
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
@@ -515,6 +513,7 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
 
   /**
    * Return inter-task OutputWriters associated with additional output tags.
+   *
    * @param irVertex                source irVertex
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
@@ -531,10 +530,9 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
         .stream()
         .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
         .filter(outEdge -> taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
-        .forEach(outEdgeForThisVertex -> {
+        .forEach(outEdgeForThisVertex ->
           additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(),
-              dataTransferFactory.createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex));
-        });
+              dataTransferFactory.createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex)));
 
     return additionalChildrenTaskWriters;
   }
@@ -590,6 +588,7 @@ private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex)
    * Finalize the output write of this vertex.
    * As element-wise output write is done and the block is in memory,
    * flush the block into the designated data store and commit it.
+   *
    * @param vertexHarness harness.
    */
   private void finalizeOutputWriters(final VertexHarness vertexHarness) {
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 7c1cf9339..dc8f1b297 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -17,6 +17,7 @@
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.coder.*;
+import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
@@ -127,7 +128,7 @@ public void setUp() throws Exception {
     IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
 
     // Generates the ids and the data of the blocks to be used.
-    final String shuffleEdge = RuntimeIdManager.generateStageEdgeId("shuffle_edge");
+    final String shuffleEdge = IdManager.newEdgeId();
     IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
       final String taskId = getTaskId(writeTaskIdx);
@@ -146,7 +147,7 @@ public void setUp() throws Exception {
 
     // Following part is for the concurrent read test.
     final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
-    final String concEdge = RuntimeIdManager.generateStageEdgeId("conc_read_edge");
+    final String concEdge = IdManager.newEdgeId();
 
     // Generates the ids and the data to be used.
     concBlockId = RuntimeIdManager.generateBlockId(concEdge, getTaskId(NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1));
@@ -165,7 +166,7 @@ public void setUp() throws Exception {
     // Generates the ids of the tasks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
     IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
-    final String hashEdge = RuntimeIdManager.generateStageEdgeId("hash_edge");
+    final String hashEdge = IdManager.newEdgeId();
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index a1251d58b..4889c06c1 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -24,6 +24,7 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.InMemorySourceVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -332,7 +333,7 @@ public void testAdditionalOutputs() throws Exception {
                                            final IRVertex dst,
                                            final boolean isSideInput) {
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
-    ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
+    ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties = new ExecutionPropertyMap<>(runtimeIREdgeId);
     edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
 
@@ -342,7 +343,7 @@ public void testAdditionalOutputs() throws Exception {
                                            final IRVertex dst,
                                            final boolean isSideInput,
                                            final String runtimeIREdgeId) {
-    ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
+    ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties = new ExecutionPropertyMap<>(runtimeIREdgeId);
     edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 3f2ffda15..def088871 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -45,6 +45,8 @@
 
 /**
  * Master-side block manager.
+ * This implementation assumes that only a single user application can submit (maybe multiple) plans through
+ * {@link edu.snu.nemo.runtime.master.scheduler.Scheduler}.
  */
 @ThreadSafe
 @DriverSide
@@ -75,7 +77,7 @@
   @Inject
   private BlockManagerMaster(final MessageEnvironment masterMessageEnvironment) {
     masterMessageEnvironment.setupListener(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID,
-        new PartitionManagerMasterControlMessageReceiver());
+      new PartitionManagerMasterControlMessageReceiver());
     this.blockIdWildcardToMetadataSet = new HashMap<>();
     this.producerTaskIdToBlockIds = new HashMap<>();
     this.lock = new ReentrantReadWriteLock();
@@ -84,8 +86,8 @@ private BlockManagerMaster(final MessageEnvironment masterMessageEnvironment) {
   /**
    * Initializes the states of a block which will be produced by a producer task.
    *
-   * @param blockId             the id of the block to initialize.
-   * @param producerTaskId      the id of the producer task.
+   * @param blockId        the id of the block to initialize.
+   * @param producerTaskId the id of the producer task.
    */
   @VisibleForTesting
   private void initializeState(final String blockId, final String producerTaskId) {
@@ -143,11 +145,11 @@ public BlockRequestHandler getBlockLocationHandler(final String blockIdOrWildcar
     readLock.lock();
     try {
       final Set<BlockMetadata> metadataSet =
-          getBlockWildcardStateSet(RuntimeIdManager.getWildCardFromBlockId(blockIdOrWildcard));
+        getBlockWildcardStateSet(RuntimeIdManager.getWildCardFromBlockId(blockIdOrWildcard));
       final List<BlockMetadata> candidates = metadataSet.stream()
-          .filter(metadata -> metadata.getBlockState().equals(BlockState.State.IN_PROGRESS)
-              || metadata.getBlockState().equals(BlockState.State.AVAILABLE))
-          .collect(Collectors.toList());
+        .filter(metadata -> metadata.getBlockState().equals(BlockState.State.IN_PROGRESS)
+          || metadata.getBlockState().equals(BlockState.State.AVAILABLE))
+        .collect(Collectors.toList());
       if (!candidates.isEmpty()) {
         // Randomly pick one of the candidate handlers.
         return candidates.get(random.nextInt(candidates.size())).getLocationHandler();
@@ -187,7 +189,8 @@ public BlockRequestHandler getBlockLocationHandler(final String blockIdOrWildcar
 
   /**
    * To be called when a potential producer task is scheduled.
-   * @param taskId the ID of the scheduled task.
+   *
+   * @param taskId   the ID of the scheduled task.
    * @param blockIds this task will produce
    */
   public void onProducerTaskScheduled(final String taskId, final Set<String> blockIds) {
@@ -288,10 +291,10 @@ public void onBlockStateChanged(final String blockId,
 
   private BlockMetadata getBlockMetaData(final String blockId) {
     final List<BlockMetadata> candidates =
-        blockIdWildcardToMetadataSet.get(RuntimeIdManager.getWildCardFromBlockId(blockId))
-            .stream()
-            .filter(meta -> meta.getBlockId().equals(blockId))
-            .collect(Collectors.toList());
+      blockIdWildcardToMetadataSet.get(RuntimeIdManager.getWildCardFromBlockId(blockId))
+        .stream()
+        .filter(meta -> meta.getBlockId().equals(blockId))
+        .collect(Collectors.toList());
     if (candidates.size() != 1) {
       throw new RuntimeException("BlockId " + blockId + ": " + candidates.toString()); // should match only 1
     }
@@ -330,15 +333,15 @@ public void onMessage(final ControlMessage.Message message) {
         switch (message.getType()) {
           case BlockStateChanged:
             final ControlMessage.BlockStateChangedMsg blockStateChangedMsg =
-                message.getBlockStateChangedMsg();
+              message.getBlockStateChangedMsg();
             final String blockId = blockStateChangedMsg.getBlockId();
             onBlockStateChanged(blockId, convertBlockState(blockStateChangedMsg.getState()),
-                blockStateChangedMsg.getLocation());
+              blockStateChangedMsg.getLocation());
             break;
           default:
             throw new IllegalMessageException(
-                new Exception("This message should not be received by "
-                    + BlockManagerMaster.class.getName() + ":" + message.getType()));
+              new Exception("This message should not be received by "
+                + BlockManagerMaster.class.getName() + ":" + message.getType()));
         }
       } catch (final Exception e) {
         throw new RuntimeException(e);
@@ -353,8 +356,8 @@ public void onMessageWithContext(final ControlMessage.Message message, final Mes
           break;
         default:
           throw new IllegalMessageException(
-              new Exception("This message should not be received by "
-                  + BlockManagerMaster.class.getName() + ":" + message.getType()));
+            new Exception("This message should not be received by "
+              + BlockManagerMaster.class.getName() + ":" + message.getType()));
       }
     }
   }
@@ -362,7 +365,6 @@ public void onMessageWithContext(final ControlMessage.Message message, final Mes
   /**
    * The handler of block location requests.
    */
-  @VisibleForTesting
   public static final class BlockRequestHandler {
     private final String blockId;
     private final CompletableFuture<String> locationFuture;
@@ -407,38 +409,38 @@ void completeExceptionally(final Throwable throwable) {
     void registerRequest(final long requestId,
                          final MessageContext messageContext) {
       final ControlMessage.BlockLocationInfoMsg.Builder infoMsgBuilder =
-          ControlMessage.BlockLocationInfoMsg.newBuilder()
-              .setRequestId(requestId)
-              .setBlockId(blockId);
+        ControlMessage.BlockLocationInfoMsg.newBuilder()
+          .setRequestId(requestId)
+          .setBlockId(blockId);
 
       locationFuture.whenComplete((location, throwable) -> {
         if (throwable == null) {
           infoMsgBuilder.setOwnerExecutorId(location);
         } else {
           infoMsgBuilder.setState(
-              convertBlockState(((AbsentBlockException) throwable).getState()));
+            convertBlockState(((AbsentBlockException) throwable).getState()));
         }
         messageContext.reply(
-            ControlMessage.Message.newBuilder()
-                .setId(RuntimeIdManager.generateMessageId())
-                .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
-                .setType(ControlMessage.MessageType.BlockLocationInfo)
-                .setBlockLocationInfoMsg(infoMsgBuilder.build())
-                .build());
+          ControlMessage.Message.newBuilder()
+            .setId(RuntimeIdManager.generateMessageId())
+            .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+            .setType(ControlMessage.MessageType.BlockLocationInfo)
+            .setBlockLocationInfoMsg(infoMsgBuilder.build())
+            .build());
       });
     }
 
     /**
      * @return the future of the block location.
      */
-    @VisibleForTesting
-    public Future<String> getLocationFuture() {
+    public CompletableFuture<String> getLocationFuture() {
       return locationFuture;
     }
   }
 
   /**
    * Return the corresponding {@link BlockState.State} for the specified {@link ControlMessage.BlockStateFromExecutor}.
+   *
    * @param state {@link ControlMessage.BlockStateFromExecutor}
    * @return the corresponding {@link BlockState.State}
    */
@@ -457,6 +459,7 @@ void registerRequest(final long requestId,
 
   /**
    * Return the corresponding {@link ControlMessage.BlockStateFromExecutor} for the specified {@link BlockState.State}.
+   *
    * @param state {@link BlockState.State}
    * @return the corresponding {@link ControlMessage.BlockStateFromExecutor}
    */
@@ -472,5 +475,4 @@ void registerRequest(final long requestId,
         throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
     }
   }
-
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
index 185638973..01e4b3422 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
@@ -84,7 +84,7 @@ public static MetricStore getStore() {
   /**
    * Fetch metric by its metric class instance and its id.
    * @param metricClass class instance of metric.
-   * @param id metric id, which can be fetched by getId() method.
+   * @param id metric id, which can be fetched by getPlanId() method.
    * @param <T> class of metric
    * @return a metric object.
    */
@@ -114,7 +114,7 @@ public static MetricStore getStore() {
    * Same as getMetricWithId(), but if there is no such metric, it will try to create new metric object
    * using its constructor, which takes an id as a parameter.
    * @param metricClass class of metric.
-   * @param id metric id, which can be fetched by getId() method.
+   * @param id metric id, which can be fetched by getPlanId() method.
    * @param <T> class of metric
    * @return a metric object. If there was no such metric, newly create one.
    */
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanAppender.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanAppender.java
new file mode 100644
index 000000000..da6ddf147
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanAppender.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.IdManager;
+import edu.snu.nemo.common.ir.edge.executionproperty.CacheIDProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
+import edu.snu.nemo.common.ir.vertex.CachedSourceVertex;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
+import edu.snu.nemo.runtime.common.exception.PlanAppenderException;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * An implementation of plan appender which append two plans regarding to caching.
+ */
+public final class PlanAppender {
+
+  /**
+   * Private constructor.
+   */
+  private PlanAppender() {
+    // Private constructor.
+  }
+
+  /**
+   * Append to plans regarding to caching.
+   * For more information about caching part, check {@link IgnoreSchedulingTempDataReceiverProperty}.
+   *
+   * @param originalPlan the original plan.
+   * @param planToAppend the plan to append.
+   * @return the appended plan.
+   */
+  public static PhysicalPlan appendPlan(final PhysicalPlan originalPlan,
+                                        final PhysicalPlan planToAppend) {
+    // Add the stage DAG of the original plan to the builder at first.
+    final DAGBuilder<Stage, StageEdge> physicalDAGBuilder = new DAGBuilder<>(originalPlan.getStageDAG());
+
+    // Scan cached data in the original plan.
+    final Map<UUID, StageEdge> cachedEdges = new HashMap<>();
+    originalPlan.getStageDAG().getVertices().forEach(
+      stage -> originalPlan.getStageDAG().getIncomingEdgesOf(stage).stream()
+        // Cached edge toward a temporary data receiver is a representative edge.
+        .filter(stageEdge ->
+          stageEdge.getDstIRVertex().getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).isPresent())
+        .forEach(stageEdge -> stageEdge.getPropertyValue(CacheIDProperty.class)
+          .ifPresent(cacheId -> cachedEdges.put(cacheId, stageEdge))
+        ));
+
+    // Scan CacheID to a pair of cached source vertex and it's stage from the plan to append.
+    final Map<UUID, Pair<IRVertex, Stage>> cacheCandidates = new HashMap<>();
+    final DAG<Stage, StageEdge> dagToAppend = planToAppend.getStageDAG();
+    dagToAppend.topologicalDo(stage -> {
+      // Add the stage DAG of the plan to append to the builder.
+      physicalDAGBuilder.addVertex(stage);
+      dagToAppend.getIncomingEdgesOf(stage).
+        forEach(edge -> {
+          physicalDAGBuilder.connectVertices(edge);
+          // Find cached-data requiring stage edges in the submitted plan.
+          if (edge.getSrcIRVertex() instanceof CachedSourceVertex) {
+            final UUID cacheId = edge.getPropertyValue(CacheIDProperty.class)
+              .orElseThrow(() -> new PlanAppenderException("No cache id in the cached edge " + edge.getId()));
+            cacheCandidates.put(cacheId, Pair.of(edge.getSrcIRVertex(), edge.getSrc()));
+          }
+        });
+
+      // Find cached-data requiring ir edges in the submitted plan.
+      final DAG<IRVertex, RuntimeEdge<IRVertex>> stageIRDAG = stage.getIRDAG();
+      stageIRDAG.getVertices().stream()
+        .filter(irVertex -> irVertex instanceof CachedSourceVertex)
+        .forEach(cachedSourceVertex ->
+          stageIRDAG.getOutgoingEdgesOf(cachedSourceVertex).forEach(runtimeEdge -> {
+            final UUID cacheId = runtimeEdge.getPropertyValue(CacheIDProperty.class)
+              .orElseThrow(
+                () -> new PlanAppenderException("No cache id in the cached edge " + runtimeEdge.getId()));
+            cacheCandidates.put(cacheId, Pair.of(runtimeEdge.getSrc(), stage));
+          }));
+    });
+
+    // Link the cached data and the stages require the data.
+    cacheCandidates.forEach((cacheId, vertexStagePair) -> {
+      final StageEdge cachedEdge = cachedEdges.get(cacheId);
+      if (cachedEdge != null) {
+        final StageEdge newEdge = new StageEdge(
+          IdManager.newEdgeId(),
+          cachedEdge.getExecutionProperties(),
+          cachedEdge.getSrcIRVertex(),
+          vertexStagePair.left(),
+          cachedEdge.getSrc(),
+          vertexStagePair.right(),
+          cachedEdge.isSideInput());
+        physicalDAGBuilder.connectVertices(newEdge);
+        final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupPropertyValue =
+          cachedEdge.getPropertyValue(DuplicateEdgeGroupProperty.class)
+            .orElseThrow(() -> new PlanAppenderException("Cached edge does not have duplicated edge group property."));
+        duplicateEdgeGroupPropertyValue.setGroupSize(duplicateEdgeGroupPropertyValue.getGroupSize() + 1);
+        newEdge.getExecutionProperties().put(DuplicateEdgeGroupProperty.of(duplicateEdgeGroupPropertyValue));
+      } else {
+        throw new PlanAppenderException("Cached edge is not found in the original plan.");
+      }
+    });
+
+    return new PhysicalPlan(originalPlan.getPlanId(), physicalDAGBuilder.build());
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index 54264dca2..f798ac1cb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -20,6 +20,7 @@
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
+import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
@@ -41,10 +42,12 @@
 import edu.snu.nemo.runtime.common.metric.StageMetric;
 import edu.snu.nemo.runtime.common.metric.TaskMetric;
 import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
 
 import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
 
@@ -59,20 +62,22 @@
 @ThreadSafe
 public final class PlanStateManager {
   private static final Logger LOG = LoggerFactory.getLogger(PlanStateManager.class.getName());
-  private final String planId;
-  private final int maxScheduleAttempt;
+  private String planId;
+  private int maxScheduleAttempt;
+  private boolean initialized;
+  private int dagLogFileIndex = 0;
 
   /**
    * The data structures below track the execution states of this plan.
    */
-  private final PlanState planState;
+  private PlanState planState;
   private final Map<String, StageState> stageIdToState;
   private final Map<String, List<List<TaskState>>> stageIdToTaskAttemptStates; // sorted by task idx, and then attempt
 
   /**
    * Represents the plan to manage.
    */
-  private final PhysicalPlan physicalPlan;
+  private PhysicalPlan physicalPlan;
 
   /**
    * A lock and condition to check whether the plan is finished or not.
@@ -80,43 +85,76 @@
   private final Lock finishLock;
   private final Condition planFinishedCondition;
 
+  /**
+   * For metrics.
+   */
+  private final String dagDirectory;
+  private final MetricMessageHandler metricMessageHandler;
   private MetricStore metricStore;
 
-  public PlanStateManager(final PhysicalPlan physicalPlan,
-                          final int maxScheduleAttempt) {
-    this.planId = physicalPlan.getId();
-    this.physicalPlan = physicalPlan;
-    this.maxScheduleAttempt = maxScheduleAttempt;
+  /**
+   * Constructor.
+   *
+   * @param metricMessageHandler the metric handler for the plan.
+   */
+  @Inject
+  private PlanStateManager(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
+                           final MetricMessageHandler metricMessageHandler) {
+    this.metricMessageHandler = metricMessageHandler;
     this.planState = new PlanState();
     this.stageIdToState = new HashMap<>();
     this.stageIdToTaskAttemptStates = new HashMap<>();
     this.finishLock = new ReentrantLock();
     this.planFinishedCondition = finishLock.newCondition();
+    this.dagDirectory = dagDirectory;
     this.metricStore = MetricStore.getStore();
+    this.initialized = false;
+  }
 
-    metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlan.getStageDAG());
-    metricStore.triggerBroadcast(JobMetric.class, planId);
+  /**
+   * Update the physical plan and maximum attempt.
+   *
+   * @param physicalPlanToUpdate    the physical plan to manage.
+   * @param maxScheduleAttemptToSet the maximum number of times this plan/sub-part of the plan should be attempted.
+   */
+  public synchronized void updatePlan(final PhysicalPlan physicalPlanToUpdate,
+                                      final int maxScheduleAttemptToSet) {
+    if (!initialized) {
+      // First scheduling.
+      this.initialized = true;
+    }
+    this.planState = new PlanState();
+    this.metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlanToUpdate.getStageDAG());
+    this.metricStore.triggerBroadcast(JobMetric.class, planId);
+    this.physicalPlan = physicalPlanToUpdate;
+    this.planId = physicalPlanToUpdate.getPlanId();
+    this.maxScheduleAttempt = maxScheduleAttemptToSet;
     initializeComputationStates();
   }
 
   /**
    * Initializes the states for the plan/stages/tasks for this plan.
+   * TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
    */
   private void initializeComputationStates() {
     onPlanStateChanged(PlanState.State.EXECUTING);
     physicalPlan.getStageDAG().topologicalDo(stage -> {
-      stageIdToState.put(stage.getId(), new StageState());
-      stageIdToTaskAttemptStates.put(stage.getId(), new ArrayList<>(stage.getParallelism()));
-      for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
+      if (!stageIdToState.containsKey(stage.getId())) {
+        stageIdToState.put(stage.getId(), new StageState());
+        stageIdToTaskAttemptStates.put(stage.getId(), new ArrayList<>(stage.getParallelism()));
         // for each task idx of this stage
-        stageIdToTaskAttemptStates.get(stage.getId()).add(new ArrayList<>());
-        // task states will be initialized lazily in getTaskAttemptsToSchedule()
+        for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
+          // for each task idx of this stage
+          stageIdToTaskAttemptStates.get(stage.getId()).add(new ArrayList<>());
+          // task states will be initialized lazily in getTaskAttemptsToSchedule()
+        }
       }
     });
   }
 
   /**
    * Get task attempts that are "READY".
+   *
    * @param stageId to run
    * @return executable task attempts
    */
@@ -131,12 +169,12 @@ private void initializeComputationStates() {
     final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
     for (int taskIndex = 0; taskIndex < stage.getParallelism(); taskIndex++) {
       final List<TaskState> attemptStatesForThisTaskIndex =
-          stageIdToTaskAttemptStates.get(stage.getId()).get(taskIndex);
+        stageIdToTaskAttemptStates.get(stage.getId()).get(taskIndex);
 
       // If one of the attempts is COMPLETE, do not schedule
       if (attemptStatesForThisTaskIndex
-          .stream()
-          .noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
+        .stream()
+        .noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
 
         // (Step 1) Create new READY attempts, as many as
         // # of clones - # of 'not-done' attempts)
@@ -149,13 +187,13 @@ private void initializeComputationStates() {
         // (Step 2) Check max attempt
         if (attemptStatesForThisTaskIndex.size() > maxScheduleAttempt) {
           throw new RuntimeException(
-              attemptStatesForThisTaskIndex.size() + " exceeds max attempt " + maxScheduleAttempt);
+            attemptStatesForThisTaskIndex.size() + " exceeds max attempt " + maxScheduleAttempt);
         }
 
         // (Step 3) Return all READY attempts
         for (int attempt = 0; attempt < attemptStatesForThisTaskIndex.size(); attempt++) {
           if (attemptStatesForThisTaskIndex.get(attempt).getStateMachine().getCurrentState()
-              .equals(TaskState.State.READY)) {
+            .equals(TaskState.State.READY)) {
             taskAttemptsToSchedule.add(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt));
           }
         }
@@ -169,11 +207,16 @@ private void initializeComputationStates() {
   private boolean isTaskNotDone(final TaskState taskState) {
     final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
     return state.equals(TaskState.State.READY)
-        || state.equals(TaskState.State.EXECUTING)
-        || state.equals(TaskState.State.ON_HOLD);
+      || state.equals(TaskState.State.EXECUTING)
+      || state.equals(TaskState.State.ON_HOLD);
   }
 
-
+  /**
+   * Gets the attempt numbers of all tasks in a stage.
+   *
+   * @param stageId the stage to investigate.
+   * @return the attempt numbers of all tasks in a stage.
+   */
   public synchronized Set<String> getAllTaskAttemptsOfStage(final String stageId) {
     return getTaskAttemptIdsToItsState(stageId).keySet();
   }
@@ -187,16 +230,16 @@ private boolean isTaskNotDone(final TaskState taskState) {
    * and the call to this method is initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchScheduler}
    * when the message/event is received.
    *
-   * @param taskId  the ID of the task.
-   * @param newTaskState     the new state of the task.
+   * @param taskId       the ID of the task.
+   * @param newTaskState the new state of the task.
    */
   public synchronized void onTaskStateChanged(final String taskId, final TaskState.State newTaskState) {
     // Change task state
     final StateMachine taskState = getTaskStateHelper(taskId).getStateMachine();
     LOG.debug("Task State Transition: id {}, from {} to {}",
-        new Object[]{taskId, taskState.getCurrentState(), newTaskState});
+      new Object[]{taskId, taskState.getCurrentState(), newTaskState});
     metricStore.getOrCreateMetric(TaskMetric.class, taskId)
-        .addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
+      .addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
     metricStore.triggerBroadcast(TaskMetric.class, taskId);
 
     try {
@@ -209,16 +252,16 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
     final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
     final List<List<TaskState>> taskStatesOfThisStage = stageIdToTaskAttemptStates.get(stageId);
     final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.stream()
-        .map(attempts -> attempts.stream()
-            .map(state -> state.getStateMachine().getCurrentState())
-            .allMatch(curState -> curState.equals(TaskState.State.COMPLETE)
-                || curState.equals(TaskState.State.SHOULD_RETRY)
-                || curState.equals(TaskState.State.ON_HOLD)))
-        .filter(bool -> bool.equals(true))
-        .count();
+      .map(attempts -> attempts.stream()
+        .map(state -> state.getStateMachine().getCurrentState())
+        .allMatch(curState -> curState.equals(TaskState.State.COMPLETE)
+          || curState.equals(TaskState.State.SHOULD_RETRY)
+          || curState.equals(TaskState.State.ON_HOLD)))
+      .filter(bool -> bool.equals(true))
+      .count();
     if (newTaskState.equals(TaskState.State.COMPLETE)) {
       LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
-          taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
+        taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
     }
 
     // Change stage state, if needed
@@ -226,7 +269,7 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
       // INCOMPLETE stage
       case SHOULD_RETRY:
         final boolean isAPeerAttemptCompleted = getPeerAttemptsforTheSameTaskIndex(taskId).stream()
-            .anyMatch(state -> state.equals(TaskState.State.COMPLETE));
+          .anyMatch(state -> state.equals(TaskState.State.COMPLETE));
         if (!isAPeerAttemptCompleted) {
           // None of the peers has completed, hence this stage is incomplete
           onStageStateChanged(stageId, StageState.State.INCOMPLETE);
@@ -237,7 +280,7 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
       case COMPLETE:
       case ON_HOLD:
         if (numOfCompletedTaskIndicesInThisStage
-            == physicalPlan.getStageDAG().getVertexById(stageId).getParallelism()) {
+          == physicalPlan.getStageDAG().getVertexById(stageId).getParallelism()) {
           onStageStateChanged(stageId, StageState.State.COMPLETE);
         }
         break;
@@ -258,18 +301,19 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
     final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
 
     final List<TaskState> otherAttemptsforTheSameTaskIndex =
-        new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
+      new ArrayList<>(stageIdToTaskAttemptStates.get(stageId).get(taskIndex));
     otherAttemptsforTheSameTaskIndex.remove(attempt);
 
     return otherAttemptsforTheSameTaskIndex.stream()
-        .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
-        .collect(Collectors.toList());
+      .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
+      .collect(Collectors.toList());
   }
 
   /**
    * (PRIVATE METHOD)
    * Updates the state of a stage.
-   * @param stageId of the stage.
+   *
+   * @param stageId       of the stage.
    * @param newStageState of the stage.
    */
   private void onStageStateChanged(final String stageId, final StageState.State newStageState) {
@@ -277,11 +321,11 @@ private void onStageStateChanged(final String stageId, final StageState.State ne
     final StateMachine stageStateMachine = stageIdToState.get(stageId).getStateMachine();
 
     metricStore.getOrCreateMetric(StageMetric.class, stageId)
-        .addEvent(getStageState(stageId), newStageState);
+      .addEvent(getStageState(stageId), newStageState);
     metricStore.triggerBroadcast(StageMetric.class, stageId);
 
     LOG.debug("Stage State Transition: id {} from {} to {}",
-        new Object[]{stageId, stageStateMachine.getCurrentState(), newStageState});
+      new Object[]{stageId, stageStateMachine.getCurrentState(), newStageState});
     try {
       stageStateMachine.setState(newStageState);
     } catch (IllegalStateTransitionException e) {
@@ -290,7 +334,7 @@ private void onStageStateChanged(final String stageId, final StageState.State ne
 
     // Change plan state if needed
     final boolean allStagesCompleted = stageIdToState.values().stream().allMatch(state ->
-        state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
+      state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
     if (allStagesCompleted) {
       onPlanStateChanged(PlanState.State.COMPLETE);
     }
@@ -299,11 +343,12 @@ private void onStageStateChanged(final String stageId, final StageState.State ne
   /**
    * (PRIVATE METHOD)
    * Updates the state of the plan.
+   *
    * @param newState of the plan.
    */
   private void onPlanStateChanged(final PlanState.State newState) {
     metricStore.getOrCreateMetric(JobMetric.class, planId)
-        .addEvent((PlanState.State) planState.getStateMachine().getCurrentState(), newState);
+      .addEvent((PlanState.State) planState.getStateMachine().getCurrentState(), newState);
     metricStore.triggerBroadcast(JobMetric.class, planId);
 
 
@@ -316,7 +361,7 @@ private void onPlanStateChanged(final PlanState.State newState) {
     if (newState == PlanState.State.EXECUTING) {
       LOG.debug("Executing Plan ID {}...", this.planId);
     } else if (newState == PlanState.State.COMPLETE || newState == PlanState.State.FAILED) {
-      LOG.debug("Plan ID {} {}!", new Object[]{planId, newState});
+      LOG.debug("Plan ID {} {}!", planId, newState);
 
       // Awake all threads waiting the finish of this plan.
       finishLock.lock();
@@ -334,12 +379,13 @@ private void onPlanStateChanged(final PlanState.State newState) {
 
   /**
    * Wait for this plan to be finished and return the final state.
+   *
    * @return the final state of this plan.
    */
   public PlanState.State waitUntilFinish() {
     finishLock.lock();
     try {
-      if (!isPlanDone()) {
+      while (!isPlanDone()) {
         planFinishedCondition.await();
       }
     } catch (final InterruptedException e) {
@@ -354,8 +400,9 @@ private void onPlanStateChanged(final PlanState.State newState) {
   /**
    * Wait for this plan to be finished and return the final state.
    * It wait for at most the given time.
+   *
    * @param timeout of waiting.
-   * @param unit of the timeout.
+   * @param unit    of the timeout.
    * @return the final state of this plan.
    */
   public PlanState.State waitUntilFinish(final long timeout, final TimeUnit unit) {
@@ -375,56 +422,96 @@ private void onPlanStateChanged(final PlanState.State newState) {
     return getPlanState();
   }
 
+  /**
+   * @return whether the execution for the plan is done or not.
+   */
   public synchronized boolean isPlanDone() {
     return (getPlanState() == PlanState.State.COMPLETE || getPlanState() == PlanState.State.FAILED);
   }
 
+  /**
+   * @return the ID of the plan.
+   */
   public synchronized String getPlanId() {
     return planId;
   }
 
+  /**
+   * @return the state of the plan.
+   */
   public synchronized PlanState.State getPlanState() {
     return (PlanState.State) planState.getStateMachine().getCurrentState();
   }
 
+  /**
+   * @param stageId the stage ID to query.
+   * @return the state of the stage.
+   */
   public synchronized StageState.State getStageState(final String stageId) {
     return (StageState.State) stageIdToState.get(stageId).getStateMachine().getCurrentState();
   }
 
+  /**
+   * @param taskId the ID of the task to query.
+   * @return the state of the task.
+   */
   public synchronized TaskState.State getTaskState(final String taskId) {
     return (TaskState.State) getTaskStateHelper(taskId).getStateMachine().getCurrentState();
   }
 
   private TaskState getTaskStateHelper(final String taskId) {
     return stageIdToTaskAttemptStates
-        .get(RuntimeIdManager.getStageIdFromTaskId(taskId))
-        .get(RuntimeIdManager.getIndexFromTaskId(taskId))
-        .get(RuntimeIdManager.getAttemptFromTaskId(taskId));
+      .get(RuntimeIdManager.getStageIdFromTaskId(taskId))
+      .get(RuntimeIdManager.getIndexFromTaskId(taskId))
+      .get(RuntimeIdManager.getAttemptFromTaskId(taskId));
+  }
+
+  /**
+   * @return the physical plan.
+   */
+  public synchronized PhysicalPlan getPhysicalPlan() {
+    return physicalPlan;
+  }
+
+  /**
+   * @return the maximum number of task scheduling.
+   */
+  public int getMaxScheduleAttempt() {
+    return maxScheduleAttempt;
+  }
+
+  /**
+   * @return whether any plan has been submitted and initialized.
+   */
+  public synchronized boolean isInitialized() {
+    return initialized;
   }
 
   /**
    * Stores JSON representation of plan state into a file.
-   * @param directory the directory which JSON representation is saved to
+   *
    * @param suffix suffix for file name
    */
-  public void storeJSON(final String directory, final String suffix) {
-    if (directory.equals(EMPTY_DAG_DIRECTORY)) {
+  public void storeJSON(final String suffix) {
+    if (dagDirectory.equals(EMPTY_DAG_DIRECTORY)) {
       return;
     }
 
-    final File file = new File(directory, planId + "-" + suffix + ".json");
+    final File file = new File(dagDirectory, planId + "-" + dagLogFileIndex + "-" + suffix + ".json");
     file.getParentFile().mkdirs();
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
       LOG.debug(String.format("JSON representation of plan state for %s(%s) was saved to %s",
-          planId, suffix, file.getPath()));
+        planId, dagLogFileIndex + "-" + suffix, file.getPath()));
     } catch (final IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of plan state for %s(%s) to %s: %s",
-          planId, suffix, file.getPath(), e.toString()));
+        planId, dagLogFileIndex + "-" + suffix, file.getPath(), e.toString()));
+    } finally {
+      dagLogFileIndex++;
     }
   }
 
-  public String toStringWithPhysicalPlan() {
+  private String toStringWithPhysicalPlan() {
     final StringBuilder sb = new StringBuilder("{");
     sb.append("\"dag\": ").append(physicalPlan.getStageDAG().toString()).append(", ");
     sb.append("\"planState\": ").append(toString()).append("}");
@@ -465,10 +552,10 @@ public synchronized String toString() {
   @VisibleForTesting
   public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
     return physicalPlan.getStageDAG().getVertices()
-        .stream()
-        .map(Stage::getId)
-        .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      .stream()
+      .map(Stage::getId)
+      .flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
+      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
   private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
@@ -478,7 +565,7 @@ public synchronized String toString() {
       final List<TaskState> attemptStates = taskStates.get(taskIndex);
       for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
         result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
-            (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
+          (TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
       }
     }
     return result;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index bfc69d2c7..42854698f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.master;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -38,7 +37,6 @@
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.annotations.Parameter;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletHandler;
 import org.slf4j.Logger;
@@ -82,9 +80,9 @@
   private final MessageEnvironment masterMessageEnvironment;
   private final ClientRPC clientRPC;
   private final MetricManagerMaster metricManagerMaster;
+  private final PlanStateManager planStateManager;
   // For converting json data. This is a thread safe.
   private final ObjectMapper objectMapper;
-  private final String dagDirectory;
   private final Set<IRVertex> irVertices;
   private final AtomicInteger resourceRequestCount;
   private CountDownLatch metricCountDownLatch;
@@ -98,7 +96,7 @@ private RuntimeMaster(final Scheduler scheduler,
                         final MessageEnvironment masterMessageEnvironment,
                         final ClientRPC clientRPC,
                         final MetricManagerMaster metricManagerMaster,
-                        @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
+                        final PlanStateManager planStateManager) {
     // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
     // compared to the job completion times of executed jobs
@@ -113,11 +111,11 @@ private RuntimeMaster(final Scheduler scheduler,
         .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
     this.clientRPC = clientRPC;
     this.metricManagerMaster = metricManagerMaster;
-    this.dagDirectory = dagDirectory;
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
     this.objectMapper = new ObjectMapper();
     this.metricServer = startRestMetricServer();
+    this.planStateManager = planStateManager;
   }
 
   private Server startRestMetricServer() {
@@ -143,8 +141,9 @@ private Server startRestMetricServer() {
 
   /**
    * Submits the {@link PhysicalPlan} to Runtime.
+   * At now, we are assuming that a single job submit multiple plans.
    *
-   * @param plan to execute
+   * @param plan               to execute
    * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
    */
   public Pair<PlanStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
@@ -152,9 +151,8 @@ private Server startRestMetricServer() {
     final Callable<Pair<PlanStateManager, ScheduledExecutorService>> planExecutionCallable = () -> {
       this.irVertices.addAll(plan.getIdToIRVertex().values());
       try {
-        final PlanStateManager planStateManager = new PlanStateManager(plan, maxScheduleAttempt);
-        scheduler.schedulePlan(plan, planStateManager);
-        final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging(planStateManager);
+        scheduler.schedulePlan(plan, maxScheduleAttempt);
+        final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging();
         return Pair.of(planStateManager, dagLoggingExecutor);
       } catch (Exception e) {
         throw new RuntimeException(e);
@@ -167,6 +165,9 @@ private Server startRestMetricServer() {
     }
   }
 
+  /**
+   * Terminates the RuntimeMaster.
+   */
   public void terminate() {
     // send metric flush request to all executors
     metricManagerMaster.sendMetricFlushRequest();
@@ -201,6 +202,11 @@ public void terminate() {
     // Do not shutdown runtimeMasterThread. We need it to clean things up.
   }
 
+  /**
+   * Requests a container with resource specification.
+   *
+   * @param resourceSpecificationString the resource specification.
+   */
   public void requestContainer(final String resourceSpecificationString) {
     final Future<?> containerRequestEventResult = runtimeMasterThread.submit(() -> {
       try {
@@ -393,17 +399,16 @@ private void handleControlMessage(final ControlMessage.Message message) {
 
   /**
    * Schedules a periodic DAG logging thread.
-   * @param planStateManager for the plan the DAG should be logged.
    * TODO #20: RESTful APIs to Access Job State and Metric.
+   *
    * @return the scheduled executor service.
    */
-  private ScheduledExecutorService scheduleDagLogging(final PlanStateManager planStateManager) {
+  private ScheduledExecutorService scheduleDagLogging() {
     final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor();
     dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
-      private int dagLogFileIndex = 0;
 
       public void run() {
-        planStateManager.storeJSON(dagDirectory, String.valueOf(dagLogFileIndex++));
+        planStateManager.storeJSON("periodic");
       }
     }, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS);
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 9872f954f..7ae8a25ab 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -48,6 +48,6 @@ public void setScheduler(final Scheduler scheduler) {
   public void onNext(final UpdatePhysicalPlanEvent updatePhysicalPlanEvent) {
     final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
 
-    this.scheduler.updatePlan(newPlan.getId(), newPlan);
+    this.scheduler.updatePlan(newPlan);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 243f7396c..ac2db8626 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -20,10 +20,12 @@
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.PlanAppender;
 import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
 import edu.snu.nemo.runtime.master.DynOptDataHandler;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -39,6 +41,7 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -60,6 +63,7 @@
   private final TaskDispatcher taskDispatcher;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorRegistry executorRegistry;
+  private final PlanStateManager planStateManager;
 
   /**
    * Other necessary components of this {@link edu.snu.nemo.runtime.master.RuntimeMaster}.
@@ -70,8 +74,6 @@
   /**
    * The below variables depend on the submitted plan to execute.
    */
-  private PhysicalPlan physicalPlan;
-  private PlanStateManager planStateManager;
   private List<List<Stage>> sortedScheduleGroups;
   private List<DynOptDataHandler> dynOptDataHandlers;
 
@@ -81,7 +83,8 @@ private BatchScheduler(final TaskDispatcher taskDispatcher,
                          final BlockManagerMaster blockManagerMaster,
                          final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
                          final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
-                         final ExecutorRegistry executorRegistry) {
+                         final ExecutorRegistry executorRegistry,
+                         final PlanStateManager planStateManager) {
     this.taskDispatcher = taskDispatcher;
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.blockManagerMaster = blockManagerMaster;
@@ -92,48 +95,75 @@ private BatchScheduler(final TaskDispatcher taskDispatcher,
           .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
     }
     this.executorRegistry = executorRegistry;
+    this.planStateManager = planStateManager;
     this.dynOptDataHandlers = new ArrayList<>();
     dynOptDataHandlers.add(new DataSkewDynOptDataHandler());
   }
 
   /**
+   * Schedules a given plan.
+   * If multiple physical plans are submitted, they will be appended and handled as a single plan.
+   * TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
+   *
    * @param submittedPhysicalPlan the physical plan to schedule.
-   * @param submittedPlanStateManager the state manager of the plan.
+   * @param maxScheduleAttempt    the max number of times this plan/sub-part of the plan should be attempted.
    */
   @Override
-  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final PlanStateManager submittedPlanStateManager) {
-    this.physicalPlan = submittedPhysicalPlan;
-    this.planStateManager = submittedPlanStateManager;
-
-    taskDispatcher.run(this.planStateManager);
-    LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
-
-    this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices().stream()
-        .collect(Collectors.groupingBy(Stage::getScheduleGroup))
-        .entrySet().stream()
-        .sorted(Map.Entry.comparingByKey())
-        .map(Map.Entry::getValue)
-        .collect(Collectors.toList());
+  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
+                           final int maxScheduleAttempt) {
+    LOG.info("Plan to schedule: {}", submittedPhysicalPlan.getPlanId());
+
+    if (!planStateManager.isInitialized()) {
+      // First scheduling.
+      taskDispatcher.run();
+      updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
+      planStateManager.storeJSON("submitted");
+    } else {
+      // Append the submitted plan to the original plan.
+      final PhysicalPlan appendedPlan =
+          PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
+      updatePlan(appendedPlan, maxScheduleAttempt);
+      planStateManager.storeJSON("appended");
+    }
 
     doSchedule();
   }
 
   @Override
-  public void updatePlan(final String planId, final PhysicalPlan newPhysicalPlan) {
+  public void updatePlan(final PhysicalPlan newPhysicalPlan) {
     // update the physical plan in the scheduler.
     // NOTE: what's already been executed is not modified in the new physical plan.
-    this.physicalPlan = newPhysicalPlan;
+    // TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
+    updatePlan(newPhysicalPlan, planStateManager.getMaxScheduleAttempt());
+  }
+
+  /**
+   * Update the physical plan in the scheduler.
+   *
+   * @param newPhysicalPlan    the new physical plan to update.
+   * @param maxScheduleAttempt the maximum number of task scheduling attempt.
+   */
+  private void updatePlan(final PhysicalPlan newPhysicalPlan,
+                          final int maxScheduleAttempt) {
+    planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
+    this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
+        .collect(Collectors.groupingBy(Stage::getScheduleGroup))
+        .entrySet().stream()
+        .sorted(Map.Entry.comparingByKey())
+        .map(Map.Entry::getValue)
+        .collect(Collectors.toList());
   }
 
   /**
    * Handles task state transition notifications sent from executors.
    * Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
    * We ignore such late-arriving notifications, and only handle notifications for the current task attempt.
-   * @param executorId the id of the executor where the message was sent from.
-   * @param taskId whose state has changed
+   *
+   * @param executorId       the id of the executor where the message was sent from.
+   * @param taskId           whose state has changed
    * @param taskAttemptIndex of the task whose state has changed
-   * @param newState the state to change to
-   * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
+   * @param newState         the state to change to
+   * @param vertexPutOnHold  the ID of vertex that is put on hold. It is null otherwise.
    */
   public void onTaskStateReportFromExecutor(final String executorId,
                                             final String taskId,
@@ -234,7 +264,7 @@ public void terminate() {
   /**
    * The main entry point for task scheduling.
    * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
-   *
+   * <p>
    * These are the reasons why.
    * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
    * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
@@ -281,10 +311,20 @@ private void doSchedule() {
   }
 
   private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
+    if (stageToSchedule.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)) {
+      // Ignore ghost stage.
+      for (final String taskId : planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId())) {
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+      }
+
+      return Collections.emptyList();
+    }
+
     final List<StageEdge> stageIncomingEdges =
-        physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+        planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<StageEdge> stageOutgoingEdges =
-        physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+        planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
     // Create and return tasks.
     final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
@@ -292,7 +332,7 @@ private void doSchedule() {
     final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
     final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
-      final Set<String> blockIds = physicalPlan.getStageDAG()
+      final Set<String> blockIds = planStateManager.getPhysicalPlan().getStageDAG()
           .getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
           .stream()
           .map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
@@ -300,7 +340,7 @@ private void doSchedule() {
       blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
       final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
       tasks.add(new Task(
-          physicalPlan.getId(),
+          planStateManager.getPhysicalPlan().getPlanId(),
           taskId,
           stageToSchedule.getExecutionProperties(),
           stageToSchedule.getSerializedIRDAG(),
@@ -316,27 +356,32 @@ private void doSchedule() {
   /**
    * Action after task execution has been completed.
    * Note this method should not be invoked when the previous state of the task is ON_HOLD.
+   *
    * @param executorId id of the executor.
-   * @param taskId the ID of the task completed.
+   * @param taskId     the ID of the task completed.
    */
   private void onTaskExecutionComplete(final String executorId,
                                        final String taskId) {
-    LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
+    LOG.debug("{} completed in {}", taskId, executorId);
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
       executor.onTaskExecutionComplete(taskId);
       return Pair.of(executor, state);
     });
   }
 
+  /**
+   * @param taskId the metric collected task ID.
+   * @return the edge to optimize.
+   */
   private StageEdge getEdgeToOptimize(final String taskId) {
     // Get a stage including the given task
-    final Stage stagePutOnHold = physicalPlan.getStageDAG().getVertices().stream()
+    final Stage stagePutOnHold = planStateManager.getPhysicalPlan().getStageDAG().getVertices().stream()
       .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
       .findFirst()
       .orElseThrow(() -> new RuntimeException());
 
     // Get outgoing edges of that stage with MetricCollectionProperty
-    List<StageEdge> stageEdges = physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
+    List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
     for (StageEdge edge : stageEdges) {
       if (edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {
         return edge;
@@ -348,6 +393,7 @@ private StageEdge getEdgeToOptimize(final String taskId) {
 
   /**
    * Action for after task execution is put on hold.
+   *
    * @param executorId       the ID of the executor.
    * @param taskId           the ID of the task.
    */
@@ -373,16 +419,17 @@ private void onTaskExecutionOnHold(final String executorId,
           .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
           .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
       pubSubEventHandlerWrapper.getPubSubEventHandler()
-          .onNext(new DynamicOptimizationEvent(physicalPlan, dynOptDataHandler.getDynOptData(),
+          .onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
               taskId, executorId, targetEdge));
     }
   }
 
   /**
    * Action for after task execution has failed but it's recoverable.
-   * @param executorId    the ID of the executor
-   * @param taskId   the ID of the task
-   * @param failureCause  the cause of failure
+   *
+   * @param executorId   the ID of the executor
+   * @param taskId       the ID of the task
+   * @param failureCause the cause of failure
    */
   private void onTaskExecutionFailedRecoverable(final String executorId,
                                                 final String taskId,
@@ -424,7 +471,11 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
 
     final Set<String> parentsWithLostBlocks = children.stream()
         .flatMap(child -> getParentTasks(child).stream())
-        .filter(parent -> blockManagerMaster.getBlockLocationHandler(parent).getLocationFuture().isCancelled())
+        .filter(parent -> {
+          final CompletableFuture<String> locationFuture =
+            blockManagerMaster.getBlockLocationHandler(parent).getLocationFuture();
+          return locationFuture.isCompletedExceptionally() || locationFuture.isCancelled();
+        })
         .collect(Collectors.toSet());
 
     // Recursive call
@@ -432,8 +483,9 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
   }
 
   private Set<String> getParentTasks(final String childTaskId) {
+
     final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
-    return physicalPlan.getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
+    return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
         .stream()
         .flatMap(inStageEdge -> {
           final String parentStageId = inStageEdge.getSrc().getId();
@@ -455,6 +507,11 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
         .collect(Collectors.toSet());
   }
 
+  /**
+   * Update the data for dynamic optimization.
+   *
+   * @param dynOptData the data to update.
+   */
   public void updateDynOptData(final Object dynOptData) {
     final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
         .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index f8413f1a0..12abd56a7 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -17,7 +17,6 @@
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -36,40 +35,43 @@
 
   /**
    * Schedules the given plan.
-   * @param physicalPlan of the job being submitted.
-   * @param planStateManager to manage the states of the submitted plan.
+   *
+   * @param physicalPlan       the plan of the job being submitted.
+   * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
    */
-  void schedulePlan(PhysicalPlan physicalPlan,
-                    PlanStateManager planStateManager);
+  void schedulePlan(PhysicalPlan physicalPlan, int maxScheduleAttempt);
 
   /**
    * Receives and updates the scheduler with a new physical plan for a job.
-   * @param planId the ID of the physical plan to change.
+   *
    * @param newPhysicalPlan new physical plan for the job.
    */
-  void updatePlan(String planId, PhysicalPlan newPhysicalPlan);
+  void updatePlan(PhysicalPlan newPhysicalPlan);
 
   /**
    * Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.
+   *
    * @param executorRepresenter a representation of the added executor.
    */
   void onExecutorAdded(ExecutorRepresenter executorRepresenter);
 
   /**
    * Called when an executor is removed from Runtime, so that faults related to the removal can be handled.
+   *
    * @param executorId of the executor that has been removed.
    */
   void onExecutorRemoved(String executorId);
 
   /**
    * Called when a Task's execution state changes.
-   * @param executorId of the executor in which the Task is executing.
-   * @param taskId of the Task whose state must be updated.
-   * @param newState for the Task.
-   * @param attemptIdx the number of times this Task has executed.
+   *
+   * @param executorId    of the executor in which the Task is executing.
+   * @param taskId        of the Task whose state must be updated.
+   * @param newState      for the Task.
+   * @param attemptIdx    the number of times this Task has executed.
    *************** the below parameters are only valid for failures *****************
    * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
-   * @param failureCause for which the Task failed in the case of a recoverable failure.
+   * @param failureCause  for which the Task failed in the case of a recoverable failure.
    */
   void onTaskStateReportFromExecutor(String executorId,
                                      String taskId,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f5e270ab2..4e37fde2a 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -17,6 +17,8 @@
 
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
@@ -59,8 +61,12 @@ private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockMa
       if (CommunicationPatternProperty.Value.OneToOne.equals(
           physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
               .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+        final Optional<DuplicateEdgeGroupPropertyValue> dupProp =
+            physicalStageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+        final String representativeEdgeId = dupProp.isPresent()
+            ? dupProp.get().getRepresentativeEdgeId() : physicalStageEdge.getId();
         final String blockIdToRead =
-            RuntimeIdManager.generateBlockId(physicalStageEdge.getId(), task.getTaskId());
+            RuntimeIdManager.generateBlockId(representativeEdgeId, task.getTaskId());
         final BlockManagerMaster.BlockRequestHandler locationHandler =
             blockManagerMaster.getBlockLocationHandler(blockIdToRead);
         if (locationHandler.getLocationFuture().isDone()) { // if the location is known.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 0dbee50ab..b40790deb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -48,6 +48,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName());
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorService schedulerThread;
+  private final PlanStateManager planStateManager;
   private boolean isSchedulerRunning;
   private boolean isTerminated;
 
@@ -56,16 +57,16 @@
   private final SchedulingConstraintRegistry schedulingConstraintRegistry;
   private final SchedulingPolicy schedulingPolicy;
 
-  private PlanStateManager planStateManager;
-
   @Inject
   private TaskDispatcher(final SchedulingConstraintRegistry schedulingConstraintRegistry,
                          final SchedulingPolicy schedulingPolicy,
                          final PendingTaskCollectionPointer pendingTaskCollectionPointer,
-                         final ExecutorRegistry executorRegistry) {
+                         final ExecutorRegistry executorRegistry,
+                         final PlanStateManager planStateManager) {
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
         new Thread(runnable, "TaskDispatcher thread"));
+    this.planStateManager = planStateManager;
     this.isSchedulerRunning = false;
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
@@ -161,8 +162,7 @@ void onNewPendingTaskCollectionAvailable() {
   /**
    * Run the dispatcher thread.
    */
-  void run(final PlanStateManager plan) {
-    this.planStateManager = plan;
+  void run() {
     if (!isTerminated && !isSchedulerRunning) {
       schedulerThread.execute(new SchedulerThread());
       schedulerThread.shutdown();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 04437cb63..959b3fc65 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master;
 
+import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -76,11 +77,12 @@ private static void checkPendingFuture(final Future<String> future) {
 
   /**
    * Test scenario where block becomes committed and then lost.
+   *
    * @throws Exception
    */
   @Test
   public void testLostAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdManager.generateStageEdgeId("Edge0");
+    final String edgeId = IdManager.newEdgeId();
     final int srcTaskIndex = 0;
     final String taskId = RuntimeIdManager.generateTaskId("Stage0", srcTaskIndex, FIRST_ATTEMPT);
     final String executorId = RuntimeIdManager.generateExecutorId();
@@ -108,11 +110,12 @@ public void testLostAfterCommit() throws Exception {
 
   /**
    * Test scenario where producer task fails.
+   *
    * @throws Exception
    */
   @Test
   public void testBeforeAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdManager.generateStageEdgeId("Edge1");
+    final String edgeId = IdManager.newEdgeId();
     final int srcTaskIndex = 0;
 
     // First attempt
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
index aeae40c5d..32044177b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
@@ -46,12 +46,17 @@
 @RunWith(PowerMockRunner.class)
 public final class PlanStateManagerTest {
   private static final int MAX_SCHEDULE_ATTEMPT = 2;
+  private MetricMessageHandler metricMessageHandler;
+  private PlanStateManager planStateManager;
 
   @Before
   public void setUp() throws Exception {
     final Injector injector = LocalMessageEnvironment.forkInjector(LocalMessageDispatcher.getInjector(),
         MessageEnvironment.MASTER_COMMUNICATION_ID);
+    metricMessageHandler = mock(MetricMessageHandler.class);
+    injector.bindVolatileInstance(MetricMessageHandler.class, metricMessageHandler);
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
+    planStateManager = injector.getInstance(PlanStateManager.class);
   }
 
   /**
@@ -62,7 +67,7 @@ public void setUp() throws Exception {
   public void testPhysicalPlanStateChanges() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final PlanStateManager planStateManager = new PlanStateManager(physicalPlan, MAX_SCHEDULE_ATTEMPT);
+    planStateManager.updatePlan(physicalPlan, MAX_SCHEDULE_ATTEMPT);
 
     assertEquals(planStateManager.getPlanId(), "TestPlan");
 
@@ -93,7 +98,7 @@ public void testPhysicalPlanStateChanges() throws Exception {
   public void testWaitUntilFinish() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final PlanStateManager planStateManager = new PlanStateManager(physicalPlan, MAX_SCHEDULE_ATTEMPT);
+    planStateManager.updatePlan(physicalPlan, MAX_SCHEDULE_ATTEMPT);
 
     assertFalse(planStateManager.isPlanDone());
 
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index d98bb0d43..301ce2181 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -62,6 +62,7 @@
 public final class BatchSchedulerTest {
   private static final Logger LOG = LoggerFactory.getLogger(BatchSchedulerTest.class.getName());
   private Scheduler scheduler;
+  private PlanStateManager planStateManager;
   private ExecutorRegistry executorRegistry;
   private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
 
@@ -79,6 +80,8 @@ public void setUp() throws Exception {
     injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
     injector.bindVolatileInstance(PubSubEventHandlerWrapper.class, mock(PubSubEventHandlerWrapper.class));
     injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
+    injector.bindVolatileInstance(MetricMessageHandler.class, mock(MetricMessageHandler.class));
+    planStateManager = injector.getInstance(PlanStateManager.class);
     scheduler = injector.getInstance(BatchScheduler.class);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
@@ -133,8 +136,7 @@ public void testPush() throws Exception {
   }
 
   private void scheduleAndCheckPlanTermination(final PhysicalPlan plan) throws InjectionException {
-    final PlanStateManager planStateManager = new PlanStateManager(plan, 1);
-    scheduler.schedulePlan(plan, planStateManager);
+    scheduler.schedulePlan(plan, 1);
 
     // For each ScheduleGroup, test if the tasks of the next ScheduleGroup are scheduled
     // after the stages of each ScheduleGroup are made "complete".
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index 6ebc7bc87..c2bbd12a0 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -21,11 +21,13 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
+import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -33,7 +35,6 @@
 import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -78,7 +79,8 @@
   public void setUp() throws Exception {
     // To understand which part of the log belongs to which test
     LOG.info("===== Testing {} =====", testName.getMethodName());
-    final Injector injector = Tang.Factory.getTang().newInjector();
+    final Injector injector = LocalMessageEnvironment.forkInjector(LocalMessageDispatcher.getInjector(),
+      MessageEnvironment.MASTER_COMMUNICATION_ID);
 
     // Get random
     random = new Random(0); // Fixed seed for reproducing test results.
@@ -86,15 +88,8 @@ public void setUp() throws Exception {
     // Get executorRegistry
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
-    // Get scheduler
-    injector.bindVolatileInstance(PubSubEventHandlerWrapper.class, mock(PubSubEventHandlerWrapper.class));
-    injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
-    injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
-    injector.bindVolatileInstance(MessageEnvironment.class, mock(MessageEnvironment.class));
-    scheduler = injector.getInstance(Scheduler.class);
-
     // Get PlanStateManager
-    planStateManager = runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
+    runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, injector);
   }
 
   @Test(timeout=7000)
@@ -214,10 +209,19 @@ private void taskOutputWriteFailed(final double chance) {
         .collect(Collectors.toList());
   }
 
-  private PlanStateManager runPhysicalPlan(final TestPlanGenerator.PlanType planType) throws Exception {
+  private void runPhysicalPlan(final TestPlanGenerator.PlanType planType,
+                               final Injector injector) throws Exception {
+    final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
     final PhysicalPlan plan = TestPlanGenerator.generatePhysicalPlan(planType, false);
-    final PlanStateManager planStateManager = new PlanStateManager(plan, MAX_SCHEDULE_ATTEMPT);
-    scheduler.schedulePlan(plan, planStateManager);
-    return planStateManager;
+
+    // Get scheduler
+    injector.bindVolatileInstance(MetricMessageHandler.class, metricMessageHandler);
+    injector.bindVolatileInstance(PubSubEventHandlerWrapper.class, mock(PubSubEventHandlerWrapper.class));
+    injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
+    injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
+    planStateManager = injector.getInstance(PlanStateManager.class);
+    scheduler = injector.getInstance(Scheduler.class);
+
+    scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services