You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/23 00:59:19 UTC

[incubator-nemo] branch reshaping updated: checkstyle

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new 66cc885  checkstyle
66cc885 is described below

commit 66cc885590f9184f693d9dae9769a0aa04ec4c57
Author: John Yang <jo...@apache.org>
AuthorDate: Wed Jan 23 09:59:07 2019 +0900

    checkstyle
---
 .../org/apache/nemo/common/dag/DAGBuilder.java     | 12 ++-
 .../main/java/org/apache/nemo/common/ir/IRDAG.java | 85 ++++++++++++----------
 .../apache/nemo/common/ir/vertex/LoopVertex.java   |  5 ++
 .../nemo/common/ir/vertex/OperatorVertex.java      |  6 +-
 .../ir/vertex/system/MessageAggregationVertex.java |  6 ++
 .../ir/vertex/system/MessageBarrierVertex.java     |  6 ++
 .../org/apache/nemo/compiler/backend/Backend.java  |  3 -
 .../nemo/compiler/backend/nemo/NemoBackend.java    |  2 -
 .../nemo/compiler/optimizer/NemoOptimizer.java     | 10 ++-
 .../annotating/SkewResourceSkewedDataPass.java     |  2 +-
 .../compiletime/annotating/UpfrontCloningPass.java |  2 +-
 .../compiletime/reshaping/LoopOptimizations.java   |  3 +-
 .../nemo/compiler/optimizer/policy/PolicyImpl.java |  2 +-
 .../frontend/beam/BeamFrontendALSTest.java         |  4 +-
 .../frontend/beam/BeamFrontendMLRTest.java         |  4 +-
 .../annotating/DefaultScheduleGroupPassTest.java   |  3 +-
 .../composite/SkewCompositePassTest.java           |  4 +-
 .../compiletime/reshaping/LoopFusionPassTest.java  |  4 +-
 .../DedicatedKeyPerElementPartitioner.java         |  4 +-
 .../nemo/runtime/common/plan/StagePartitioner.java |  1 -
 20 files changed, 97 insertions(+), 71 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
index cfdfd1f..57552ed 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
@@ -45,8 +45,6 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
   private final Map<V, LoopVertex> assignedLoopVertexMap;
   private final Map<V, Integer> loopStackDepthMap;
 
-  private String dagDirectory;
-
   /**
    * Constructor of DAGBuilder: it initializes everything.
    */
@@ -333,10 +331,16 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
     return new DAG<>(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
   }
 
+  /**
+   * Generates a user-friendly exception message.
+   * @param reason of the exception.
+   * @param problematicObjects that caused the exception.
+   * @return exception object.
+   */
   private CompileTimeOptimizationException getException(final String reason, final String problematicObjects) {
     final DAG erroredDAG = new DAG<>(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
     erroredDAG.storeJSON("debug", "errored_ir", "Errored IR");
-    return new CompileTimeOptimizationException(
-      reason + " /// Problematic objects are: " + problematicObjects + " /// see the debug directory for the errored_ir");
+    return new CompileTimeOptimizationException(reason + " /// Problematic objects are: "
+      + problematicObjects + " /// see the debug directory for the errored_ir");
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index 485948c..15efe73 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Predicate;
 
 /**
  * An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application).
@@ -53,70 +52,66 @@ public final class IRDAG {
 
   private DAG<IRVertex, IREdge> dag; // internal DAG, can be updated by reshaping methods.
 
+  /**
+   * @param dag underlying DAG.
+   */
   public IRDAG(final DAG<IRVertex, IREdge> dag) {
     this.dag = dag;
     this.metricCollectionId = new AtomicInteger(0);
   }
 
-  ////////////////////////////////////////////////// Read-only traversal methods.
+  ////////////////////////////////////////////////// Methods for querying the DAG topology.
 
+  /**
+   * Visits the current DAG snapshot in a topologically sorted order.
+   * @param function that visits each vertex.
+   */
   public void topologicalDo(final Consumer<IRVertex> function) {
     dag.topologicalDo(function);
   }
 
+  /**
+   * Get vertices of the current DAG snapshot.
+   * @return vertices.
+   */
   public List<IRVertex> getVertices() {
     return dag.getVertices();
   }
 
-  public List<IREdge> getIncomingEdgesOf(final String vertexId) {
-    return dag.getIncomingEdgesOf(vertexId);
-  }
-
+  /**
+   * Get incoming edges in the current DAG snapshot.
+   * @param v to query.
+   * @return incoming edges.
+   */
   public List<IREdge> getIncomingEdgesOf(final IRVertex v) {
     return dag.getIncomingEdgesOf(v);
   }
 
+  /**
+   * Get outgoing edges in the current DAG snapshot.
+   * @param v to query.
+   * @return outgoing edges.
+   */
   public List<IREdge> getOutgoingEdgesOf(final IRVertex v) {
     return dag.getOutgoingEdgesOf(v);
   }
 
-  public List<IREdge> getOutgoingEdgesOf(final String vertexId) {
-    return dag.getOutgoingEdgesOf(vertexId);
-  }
-
-  public void storeJSON(final String directory, final String name, final String description) {
-    dag.storeJSON(directory, name, description);
-  }
-
+  /**
+   * Get vertices in the current DAG snapshot. (sorted)
+   * @return vertices in a topologically sorted order.
+   */
   public List<IRVertex> getTopologicalSort() {
     return dag.getTopologicalSort();
   }
 
-  public List<IRVertex> getDescendants(final String vertexId) {
-    return dag.getDescendants(vertexId);
-  }
-
-  public IRVertex getVertexById(final String id) {
-    return dag.getVertexById(id);
-  }
-
-  public List<IRVertex> filterVertices(final Predicate<IRVertex> condition) {
-    return dag.filterVertices(condition);
-  }
-
-  public List<IRVertex> getParents(final String vertexId) {
-    return dag.getParents(vertexId);
-  }
-
-  public List<IRVertex> getChildren(final String vertexId) {
-    return dag.getChildren(vertexId);
-  }
-
-  public DAG<IRVertex, IREdge> getUnderlyingDAG() {
+  /**
+   * @return the current underlying DAG for direct access.
+   */
+  public DAG<IRVertex, IREdge> getCurrentDAGSnapshot() {
     return dag;
   }
 
-  ////////////////////////////////////////////////// Reshaping methods.
+  ////////////////////////////////////////////////// Methods for reshaping the DAG topology.
 
   /**
    * Inserts a new vertex that streams data.
@@ -178,7 +173,9 @@ public final class IRDAG {
    *
    * @param messageBarrierVertex to insert.
    * @param messageAggregationVertex to insert.
-   * @param edgeToGetStatisticsOf to clone and examine.
+   * @param mbvOutputEncoder to use.
+   * @param mbvOutputDecoder to use.
+   * @param edgeToGetStatisticsOf to examine.
    */
   public void insert(final MessageBarrierVertex messageBarrierVertex,
                      final MessageAggregationVertex messageAggregationVertex,
@@ -243,14 +240,26 @@ public final class IRDAG {
     dag = builder.build(); // update the DAG.
   }
 
-  ////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving is not guaranteed).
+  ////////////////////////////////////////////////// "Unsafe" direct reshaping (semantic-preserving is not guaranteed).
 
+  /**
+   * "Unsafe" direct reshaping (semantic-preserving is not guaranteed).
+   * @param unsafeReshaping a function that directly reshapes the underlying DAG.
+   */
   public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) {
     this.dag = unsafeReshaping.apply(dag);
   }
 
   ////////////////////////////////////////////////// Private helper methods.
 
+  /**
+   * @param mbv src.
+   * @param mav dst.
+   * @param encoder src-dst encoder.
+   * @param decoder src-dst decoder.
+   * @param currentMetricCollectionId of the edge.
+   * @return the edge.
+   */
   private IREdge edgeBetweenMessageVertices(final MessageBarrierVertex mbv,
                                             final MessageAggregationVertex mav,
                                             final EncoderProperty encoder,
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
index dd12406..f279783 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
@@ -310,6 +310,11 @@ public final class LoopVertex extends IRVertex {
     this.maxNumberOfIterations--;
   }
 
+  /**
+   * Check termination condition.
+   * @param that another vertex.
+   * @return true if equals.
+   */
   public boolean terminationConditionEquals(final LoopVertex that) {
     if (this.maxNumberOfIterations.equals(that.getMaxNumberOfIterations()) && Util
         .checkEqualityOfIntPredicates(this.terminationCondition, that.getTerminationCondition(),
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
index 542999f..7e2b953 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
@@ -47,19 +47,19 @@ public class OperatorVertex extends IRVertex {
   }
 
   @Override
-  public OperatorVertex getClone() {
+  public final OperatorVertex getClone() {
     return new OperatorVertex(this);
   }
 
   /**
    * @return the transform in the OperatorVertex.
    */
-  public Transform getTransform() {
+  public final Transform getTransform() {
     return transform;
   }
 
   @Override
-  public ObjectNode getPropertiesAsJsonNode() {
+  public final ObjectNode getPropertiesAsJsonNode() {
     final ObjectNode node = getIRVertexPropertiesAsJsonNode();
     node.put("transform", transform.toString());
     return node;
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java
index 2cd145d..e47c1db 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java
@@ -24,6 +24,12 @@ import org.apache.nemo.common.ir.vertex.transform.MessageAggregateTransform;
 
 import java.util.function.BiFunction;
 
+/**
+ * Aggregates upstream messages.
+ * @param <K> of the input pair.
+ * @param <V> of the input pair.
+ * @param <O> of the output aggregated message.
+ */
 public class MessageAggregationVertex<K, V, O> extends OperatorVertex {
   /**
    * @param initialState to use.
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java
index 19bf2ec..3835a94 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java
@@ -24,6 +24,12 @@ import org.apache.nemo.common.ir.vertex.transform.MessageBarrierTransform;
 import java.util.Map;
 import java.util.function.BiFunction;
 
+/**
+ * Generates messages.
+ * @param <I> input type
+ * @param <K> of the output pair.
+ * @param <V> of the output pair.
+ */
 public class MessageBarrierVertex<I, K, V> extends OperatorVertex {
   /**
    * @param messageFunction for producing a message.
diff --git a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java
index 54769da..bc3906f 100644
--- a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java
+++ b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java
@@ -19,9 +19,6 @@
 package org.apache.nemo.compiler.backend;
 
 import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.compiler.backend.nemo.NemoBackend;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
diff --git a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java
index 2a4700d..e0722ee 100644
--- a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java
@@ -21,8 +21,6 @@ package org.apache.nemo.compiler.backend.nemo;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.ir.IRDAG;
 import org.apache.nemo.compiler.backend.Backend;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 631d4f3..014dc74 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -80,7 +80,7 @@ public final class NemoOptimizer implements Optimizer {
   @Override
   public IRDAG optimizeDag(final IRDAG dag) {
     final String irDagId = "ir-" + irDagCount++ + "-";
-    dag.storeJSON(dagDirectory, irDagId, "IR before optimization");
+    dag.getCurrentDAGSnapshot().storeJSON(dagDirectory, irDagId, "IR before optimization");
 
     final IRDAG optimizedDAG;
     final Policy optimizationPolicy;
@@ -90,7 +90,7 @@ public final class NemoOptimizer implements Optimizer {
       // Handle caching first.
       final IRDAG cacheFilteredDag = handleCaching(dag, cacheIdToEdge);
       if (!cacheIdToEdge.isEmpty()) {
-        cacheFilteredDag.storeJSON(dagDirectory, irDagId + "FilterCache",
+        cacheFilteredDag.getCurrentDAGSnapshot().storeJSON(dagDirectory, irDagId + "FilterCache",
           "IR after cache filtering");
       }
 
@@ -102,7 +102,8 @@ public final class NemoOptimizer implements Optimizer {
       }
 
       optimizedDAG = optimizationPolicy.runCompileTimeOptimization(cacheFilteredDag, dagDirectory);
-      optimizedDAG.storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
+      optimizedDAG.getCurrentDAGSnapshot()
+        .storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
         "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
     } catch (final Exception e) {
       throw new CompileTimeOptimizationException(e);
@@ -121,7 +122,8 @@ public final class NemoOptimizer implements Optimizer {
     cacheIdToEdge.forEach((cacheId, edge) -> {
       if (!cacheIdToParallelism.containsKey(cacheId)) {
         cacheIdToParallelism.put(
-          cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
+          cacheId, optimizedDAG.getCurrentDAGSnapshot()
+            .getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
             .orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
       }
     });
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index 5d57327..bc8a838 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -49,7 +49,7 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
         .forEach(skewEdge -> {
           final IRVertex dstV = skewEdge.getDst();
           dstV.setProperty(ResourceSkewedDataProperty.of(true));
-          dag.getDescendants(dstV.getId()).forEach(descendentV -> {
+          dag.getCurrentDAGSnapshot().getDescendants(dstV.getId()).forEach(descendentV -> {
             descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
           });
         })
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index c3df132..34a32c0 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -39,7 +39,7 @@ public final class UpfrontCloningPass extends AnnotatingPass {
   @Override
   public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().stream()
-        .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
+        .filter(vertex -> dag.getCurrentDAGSnapshot().getIncomingEdgesOf(vertex.getId())
           .stream()
           // TODO #198: Handle Un-cloneable Beam Sink Operators
           // only shuffle receivers (for now... as particular Beam sink operators fail when cloned)
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index fa94290..113b032 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -127,7 +127,8 @@ public final class LoopOptimizations {
         loopVertices.forEach(loopVertex -> {
           final IntPredicate terminationCondition = loopVertex.getTerminationCondition();
           final Integer numberOfIterations = loopVertex.getMaxNumberOfIterations();
-          // We want loopVertices that are not dependent on each other or the list that is potentially going to be merged.
+          // We want loopVertices that are not dependent on each other
+          // or the list that is potentially going to be merged.
           final List<LoopVertex> independentLoops = loopVertices.stream().filter(loop ->
             setOfLoopsToBeFused.stream().anyMatch(list -> list.contains(loop))
               ? setOfLoopsToBeFused.stream().filter(list -> list.contains(loop))
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
index 31a56cc..4fe4923 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
@@ -85,7 +85,7 @@ public final class PolicyImpl implements Policy {
             + "Modify it or use a general CompileTimePass");
         }
         // Save the processed JSON DAG.
-        dag.storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
+        dag.getCurrentDAGSnapshot().storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
           "DAG after optimization");
       } else {
         LOG.info("Condition unmet for applying {} to the DAG", passToApply.getClass().getSimpleName());
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index 5be2bcb..233df82 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -47,12 +47,12 @@ public final class BeamFrontendALSTest {
 //    producedDAG.getTopologicalSort().forEach(v -> System.out.println(v.getId()));
     final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX.getId()).size());
+    assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexX.getId()).size());
     assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexX).size());
 
     final IRVertex vertexY = producedDAG.getTopologicalSort().get(10);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
+    assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexY.getId()).size());
     assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index 231d9ea..3858704 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -46,12 +46,12 @@ public class BeamFrontendMLRTest {
 
     final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX.getId()).size());
+    assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexX.getId()).size());
     assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexX).size());
 
     final IRVertex vertexY = producedDAG.getTopologicalSort().get(13);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
+    assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexY.getId()).size());
     assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index b3e9082..461362b 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -66,7 +66,8 @@ public final class DefaultScheduleGroupPassTest {
 
     for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
       final Integer currentScheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class).get();
-      final Integer largestScheduleGroupOfParent = processedDAG.getParents(irVertex.getId()).stream()
+      final Integer largestScheduleGroupOfParent =
+        processedDAG.getCurrentDAGSnapshot().getParents(irVertex.getId()).stream()
           .mapToInt(v -> v.getPropertyValue(ScheduleGroupProperty.class).get())
           .max().orElse(0);
       assertTrue(currentScheduleGroup >= largestScheduleGroupOfParent);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
index d1ce9bf..1135625 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
@@ -98,10 +98,10 @@ public class SkewCompositePassTest {
     final IRDAG processedDAG = new SkewCompositePass().optimize(mrDAG);
     assertEquals(originalVerticesNum + numOfShuffleEdges * 2, processedDAG.getVertices().size());
 
-    processedDAG.filterVertices(v -> v instanceof OperatorVertex
+    processedDAG.getCurrentDAGSnapshot().filterVertices(v -> v instanceof OperatorVertex
       && ((OperatorVertex) v).getTransform() instanceof MessageBarrierTransform)
       .forEach(metricV -> {
-          final List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
+          final List<IRVertex> reducerV = processedDAG.getCurrentDAGSnapshot().getChildren(metricV.getId());
           reducerV.forEach(rV -> {
             if (rV instanceof OperatorVertex &&
               !(((OperatorVertex) rV).getTransform() instanceof MessageAggregateTransform)) {
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index 96c0c74..cfe6b78 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -62,10 +62,10 @@ public class LoopFusionPassTest {
     groupedDAG = new LoopExtractionPass().optimize(originalALSDAG);
 
     groupedDAG.topologicalDo(v -> {
-      dagToBeFusedBuilder.addVertex(v, groupedDAG.getUnderlyingDAG());
+      dagToBeFusedBuilder.addVertex(v, groupedDAG.getCurrentDAGSnapshot());
       groupedDAG.getIncomingEdgesOf(v).forEach(dagToBeFusedBuilder::connectVertices);
 
-      dagNotToBeFusedBuilder.addVertex(v, groupedDAG.getUnderlyingDAG());
+      dagNotToBeFusedBuilder.addVertex(v, groupedDAG.getCurrentDAGSnapshot());
       groupedDAG.getIncomingEdgesOf(v).forEach(dagNotToBeFusedBuilder::connectVertices);
     });
     final Optional<LoopVertex> loopInDAG = groupedDAG.getTopologicalSort().stream()
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
index b87f819..63fa8f9 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -18,13 +18,11 @@
  */
 package org.apache.nemo.runtime.common.partitioner;
 
-import org.apache.nemo.common.ir.vertex.transform.StreamTransform;
-
 /**
  * An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
  * WARNING: Because this partitioner assigns a dedicated key per element, it should be used under specific circumstances
  * that the number of output element is not that many. For example, every output element of
- * {@link StreamTransform} inserted by large shuffle optimization is always
+ * StreamTransform inserted by large shuffle optimization is always
  * a partition. In this case, assigning a key for each element can be useful.
  */
 @DedicatedKeyPerElement
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java
index 99e17c7..a4d1ca5 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.nemo.runtime.common.plan;
 
-import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.ir.IRDAG;
 import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;