You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/23 00:59:19 UTC
[incubator-nemo] branch reshaping updated: checkstyle
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/reshaping by this push:
new 66cc885 checkstyle
66cc885 is described below
commit 66cc885590f9184f693d9dae9769a0aa04ec4c57
Author: John Yang <jo...@apache.org>
AuthorDate: Wed Jan 23 09:59:07 2019 +0900
checkstyle
---
.../org/apache/nemo/common/dag/DAGBuilder.java | 12 ++-
.../main/java/org/apache/nemo/common/ir/IRDAG.java | 85 ++++++++++++----------
.../apache/nemo/common/ir/vertex/LoopVertex.java | 5 ++
.../nemo/common/ir/vertex/OperatorVertex.java | 6 +-
.../ir/vertex/system/MessageAggregationVertex.java | 6 ++
.../ir/vertex/system/MessageBarrierVertex.java | 6 ++
.../org/apache/nemo/compiler/backend/Backend.java | 3 -
.../nemo/compiler/backend/nemo/NemoBackend.java | 2 -
.../nemo/compiler/optimizer/NemoOptimizer.java | 10 ++-
.../annotating/SkewResourceSkewedDataPass.java | 2 +-
.../compiletime/annotating/UpfrontCloningPass.java | 2 +-
.../compiletime/reshaping/LoopOptimizations.java | 3 +-
.../nemo/compiler/optimizer/policy/PolicyImpl.java | 2 +-
.../frontend/beam/BeamFrontendALSTest.java | 4 +-
.../frontend/beam/BeamFrontendMLRTest.java | 4 +-
.../annotating/DefaultScheduleGroupPassTest.java | 3 +-
.../composite/SkewCompositePassTest.java | 4 +-
.../compiletime/reshaping/LoopFusionPassTest.java | 4 +-
.../DedicatedKeyPerElementPartitioner.java | 4 +-
.../nemo/runtime/common/plan/StagePartitioner.java | 1 -
20 files changed, 97 insertions(+), 71 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
index cfdfd1f..57552ed 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
@@ -45,8 +45,6 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
private final Map<V, LoopVertex> assignedLoopVertexMap;
private final Map<V, Integer> loopStackDepthMap;
- private String dagDirectory;
-
/**
* Constructor of DAGBuilder: it initializes everything.
*/
@@ -333,10 +331,16 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
return new DAG<>(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
}
+ /**
+ * Generates a user-friendly exception message.
+ * @param reason of the exception.
+ * @param problematicObjects that caused the exception.
+ * @return exception object.
+ */
private CompileTimeOptimizationException getException(final String reason, final String problematicObjects) {
final DAG erroredDAG = new DAG<>(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
erroredDAG.storeJSON("debug", "errored_ir", "Errored IR");
- return new CompileTimeOptimizationException(
- reason + " /// Problematic objects are: " + problematicObjects + " /// see the debug directory for the errored_ir");
+ return new CompileTimeOptimizationException(reason + " /// Problematic objects are: "
+ + problematicObjects + " /// see the debug directory for the errored_ir");
}
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index 485948c..15efe73 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.Predicate;
/**
* An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application).
@@ -53,70 +52,66 @@ public final class IRDAG {
private DAG<IRVertex, IREdge> dag; // internal DAG, can be updated by reshaping methods.
+ /**
+ * @param dag underlying DAG.
+ */
public IRDAG(final DAG<IRVertex, IREdge> dag) {
this.dag = dag;
this.metricCollectionId = new AtomicInteger(0);
}
- ////////////////////////////////////////////////// Read-only traversal methods.
+ ////////////////////////////////////////////////// Methods for querying the DAG topology.
+ /**
+ * Visits the current DAG snapshot in a topologically sorted order.
+ * @param function that visits each vertex.
+ */
public void topologicalDo(final Consumer<IRVertex> function) {
dag.topologicalDo(function);
}
+ /**
+ * Get vertices of the current DAG snapshot.
+ * @return vertices.
+ */
public List<IRVertex> getVertices() {
return dag.getVertices();
}
- public List<IREdge> getIncomingEdgesOf(final String vertexId) {
- return dag.getIncomingEdgesOf(vertexId);
- }
-
+ /**
+ * Get incoming edges in the current DAG snapshot.
+ * @param v to query.
+ * @return incoming edges.
+ */
public List<IREdge> getIncomingEdgesOf(final IRVertex v) {
return dag.getIncomingEdgesOf(v);
}
+ /**
+ * Get outgoing edges in the current DAG snapshot.
+ * @param v to query.
+ * @return outgoing edges.
+ */
public List<IREdge> getOutgoingEdgesOf(final IRVertex v) {
return dag.getOutgoingEdgesOf(v);
}
- public List<IREdge> getOutgoingEdgesOf(final String vertexId) {
- return dag.getOutgoingEdgesOf(vertexId);
- }
-
- public void storeJSON(final String directory, final String name, final String description) {
- dag.storeJSON(directory, name, description);
- }
-
+ /**
+ * Get vertices in the current DAG snapshot. (sorted)
+ * @return vertices in a topologically sorted order.
+ */
public List<IRVertex> getTopologicalSort() {
return dag.getTopologicalSort();
}
- public List<IRVertex> getDescendants(final String vertexId) {
- return dag.getDescendants(vertexId);
- }
-
- public IRVertex getVertexById(final String id) {
- return dag.getVertexById(id);
- }
-
- public List<IRVertex> filterVertices(final Predicate<IRVertex> condition) {
- return dag.filterVertices(condition);
- }
-
- public List<IRVertex> getParents(final String vertexId) {
- return dag.getParents(vertexId);
- }
-
- public List<IRVertex> getChildren(final String vertexId) {
- return dag.getChildren(vertexId);
- }
-
- public DAG<IRVertex, IREdge> getUnderlyingDAG() {
+ /**
+ * @return the current underlying DAG for direct access.
+ */
+ public DAG<IRVertex, IREdge> getCurrentDAGSnapshot() {
return dag;
}
- ////////////////////////////////////////////////// Reshaping methods.
+ ////////////////////////////////////////////////// Methods for reshaping the DAG topology.
/**
* Inserts a new vertex that streams data.
@@ -178,7 +173,9 @@ public final class IRDAG {
*
* @param messageBarrierVertex to insert.
* @param messageAggregationVertex to insert.
- * @param edgeToGetStatisticsOf to clone and examine.
+ * @param mbvOutputEncoder to use.
+ * @param mbvOutputDecoder to use.
+ * @param edgeToGetStatisticsOf to examine.
*/
public void insert(final MessageBarrierVertex messageBarrierVertex,
final MessageAggregationVertex messageAggregationVertex,
@@ -243,14 +240,26 @@ public final class IRDAG {
dag = builder.build(); // update the DAG.
}
- ////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving is not guaranteed).
+ ////////////////////////////////////////////////// "Unsafe" direct reshaping (semantic-preserving is not guaranteed).
+ /**
+ * "Unsafe" direct reshaping (semantic-preserving is not guaranteed).
+ * @param unsafeReshaping a function that directly reshapes the underlying DAG.
+ */
public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) {
this.dag = unsafeReshaping.apply(dag);
}
////////////////////////////////////////////////// Private helper methods.
+ /**
+ * @param mbv src.
+ * @param mav dst.
+ * @param encoder src-dst encoder.
+ * @param decoder src-dst decoder.
+ * @param currentMetricCollectionId of the edge.
+ * @return the edge.
+ */
private IREdge edgeBetweenMessageVertices(final MessageBarrierVertex mbv,
final MessageAggregationVertex mav,
final EncoderProperty encoder,
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
index dd12406..f279783 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
@@ -310,6 +310,11 @@ public final class LoopVertex extends IRVertex {
this.maxNumberOfIterations--;
}
+ /**
+ * Check termination condition.
+ * @param that another vertex.
+ * @return true if equals.
+ */
public boolean terminationConditionEquals(final LoopVertex that) {
if (this.maxNumberOfIterations.equals(that.getMaxNumberOfIterations()) && Util
.checkEqualityOfIntPredicates(this.terminationCondition, that.getTerminationCondition(),
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
index 542999f..7e2b953 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
@@ -47,19 +47,19 @@ public class OperatorVertex extends IRVertex {
}
@Override
- public OperatorVertex getClone() {
+ public final OperatorVertex getClone() {
return new OperatorVertex(this);
}
/**
* @return the transform in the OperatorVertex.
*/
- public Transform getTransform() {
+ public final Transform getTransform() {
return transform;
}
@Override
- public ObjectNode getPropertiesAsJsonNode() {
+ public final ObjectNode getPropertiesAsJsonNode() {
final ObjectNode node = getIRVertexPropertiesAsJsonNode();
node.put("transform", transform.toString());
return node;
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java
index 2cd145d..e47c1db 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageAggregationVertex.java
@@ -24,6 +24,12 @@ import org.apache.nemo.common.ir.vertex.transform.MessageAggregateTransform;
import java.util.function.BiFunction;
+/**
+ * Aggregates upstream messages.
+ * @param <K> of the input pair.
+ * @param <V> of the input pair.
+ * @param <O> of the output aggregated message.
+ */
public class MessageAggregationVertex<K, V, O> extends OperatorVertex {
/**
* @param initialState to use.
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java
index 19bf2ec..3835a94 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/MessageBarrierVertex.java
@@ -24,6 +24,12 @@ import org.apache.nemo.common.ir.vertex.transform.MessageBarrierTransform;
import java.util.Map;
import java.util.function.BiFunction;
+/**
+ * Generates messages.
+ * @param <I> input type
+ * @param <K> of the output pair.
+ * @param <V> of the output pair.
+ */
public class MessageBarrierVertex<I, K, V> extends OperatorVertex {
/**
* @param messageFunction for producing a message.
diff --git a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java
index 54769da..bc3906f 100644
--- a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java
+++ b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/Backend.java
@@ -19,9 +19,6 @@
package org.apache.nemo.compiler.backend;
import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.compiler.backend.nemo.NemoBackend;
import org.apache.reef.tang.annotations.DefaultImplementation;
diff --git a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java
index 2a4700d..e0722ee 100644
--- a/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoBackend.java
@@ -21,8 +21,6 @@ package org.apache.nemo.compiler.backend.nemo;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.compiler.backend.Backend;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 631d4f3..014dc74 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -80,7 +80,7 @@ public final class NemoOptimizer implements Optimizer {
@Override
public IRDAG optimizeDag(final IRDAG dag) {
final String irDagId = "ir-" + irDagCount++ + "-";
- dag.storeJSON(dagDirectory, irDagId, "IR before optimization");
+ dag.getCurrentDAGSnapshot().storeJSON(dagDirectory, irDagId, "IR before optimization");
final IRDAG optimizedDAG;
final Policy optimizationPolicy;
@@ -90,7 +90,7 @@ public final class NemoOptimizer implements Optimizer {
// Handle caching first.
final IRDAG cacheFilteredDag = handleCaching(dag, cacheIdToEdge);
if (!cacheIdToEdge.isEmpty()) {
- cacheFilteredDag.storeJSON(dagDirectory, irDagId + "FilterCache",
+ cacheFilteredDag.getCurrentDAGSnapshot().storeJSON(dagDirectory, irDagId + "FilterCache",
"IR after cache filtering");
}
@@ -102,7 +102,8 @@ public final class NemoOptimizer implements Optimizer {
}
optimizedDAG = optimizationPolicy.runCompileTimeOptimization(cacheFilteredDag, dagDirectory);
- optimizedDAG.storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
+ optimizedDAG.getCurrentDAGSnapshot()
+ .storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
"IR optimized for " + optimizationPolicy.getClass().getSimpleName());
} catch (final Exception e) {
throw new CompileTimeOptimizationException(e);
@@ -121,7 +122,8 @@ public final class NemoOptimizer implements Optimizer {
cacheIdToEdge.forEach((cacheId, edge) -> {
if (!cacheIdToParallelism.containsKey(cacheId)) {
cacheIdToParallelism.put(
- cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
+ cacheId, optimizedDAG.getCurrentDAGSnapshot()
+ .getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
.orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
}
});
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index 5d57327..bc8a838 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -49,7 +49,7 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
.forEach(skewEdge -> {
final IRVertex dstV = skewEdge.getDst();
dstV.setProperty(ResourceSkewedDataProperty.of(true));
- dag.getDescendants(dstV.getId()).forEach(descendentV -> {
+ dag.getCurrentDAGSnapshot().getDescendants(dstV.getId()).forEach(descendentV -> {
descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
});
})
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index c3df132..34a32c0 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -39,7 +39,7 @@ public final class UpfrontCloningPass extends AnnotatingPass {
@Override
public IRDAG optimize(final IRDAG dag) {
dag.getVertices().stream()
- .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
+ .filter(vertex -> dag.getCurrentDAGSnapshot().getIncomingEdgesOf(vertex.getId())
.stream()
// TODO #198: Handle Un-cloneable Beam Sink Operators
// only shuffle receivers (for now... as particular Beam sink operators fail when cloned)
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index fa94290..113b032 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -127,7 +127,8 @@ public final class LoopOptimizations {
loopVertices.forEach(loopVertex -> {
final IntPredicate terminationCondition = loopVertex.getTerminationCondition();
final Integer numberOfIterations = loopVertex.getMaxNumberOfIterations();
- // We want loopVertices that are not dependent on each other or the list that is potentially going to be merged.
+ // We want loopVertices that are not dependent on each other
+ // or the list that is potentially going to be merged.
final List<LoopVertex> independentLoops = loopVertices.stream().filter(loop ->
setOfLoopsToBeFused.stream().anyMatch(list -> list.contains(loop))
? setOfLoopsToBeFused.stream().filter(list -> list.contains(loop))
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
index 31a56cc..4fe4923 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
@@ -85,7 +85,7 @@ public final class PolicyImpl implements Policy {
+ "Modify it or use a general CompileTimePass");
}
// Save the processed JSON DAG.
- dag.storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
+ dag.getCurrentDAGSnapshot().storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
"DAG after optimization");
} else {
LOG.info("Condition unmet for applying {} to the DAG", passToApply.getClass().getSimpleName());
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index 5be2bcb..233df82 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -47,12 +47,12 @@ public final class BeamFrontendALSTest {
// producedDAG.getTopologicalSort().forEach(v -> System.out.println(v.getId()));
final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX.getId()).size());
+ assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexX.getId()).size());
assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexX).size());
final IRVertex vertexY = producedDAG.getTopologicalSort().get(10);
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
+ assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexY.getId()).size());
assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index 231d9ea..3858704 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -46,12 +46,12 @@ public class BeamFrontendMLRTest {
final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX.getId()).size());
+ assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexX.getId()).size());
assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexX).size());
final IRVertex vertexY = producedDAG.getTopologicalSort().get(13);
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
- assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
+ assertEquals(1, producedDAG.getCurrentDAGSnapshot().getIncomingEdgesOf(vertexY.getId()).size());
assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index b3e9082..461362b 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -66,7 +66,8 @@ public final class DefaultScheduleGroupPassTest {
for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
final Integer currentScheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class).get();
- final Integer largestScheduleGroupOfParent = processedDAG.getParents(irVertex.getId()).stream()
+ final Integer largestScheduleGroupOfParent =
+ processedDAG.getCurrentDAGSnapshot().getParents(irVertex.getId()).stream()
.mapToInt(v -> v.getPropertyValue(ScheduleGroupProperty.class).get())
.max().orElse(0);
assertTrue(currentScheduleGroup >= largestScheduleGroupOfParent);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
index d1ce9bf..1135625 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
@@ -98,10 +98,10 @@ public class SkewCompositePassTest {
final IRDAG processedDAG = new SkewCompositePass().optimize(mrDAG);
assertEquals(originalVerticesNum + numOfShuffleEdges * 2, processedDAG.getVertices().size());
- processedDAG.filterVertices(v -> v instanceof OperatorVertex
+ processedDAG.getCurrentDAGSnapshot().filterVertices(v -> v instanceof OperatorVertex
&& ((OperatorVertex) v).getTransform() instanceof MessageBarrierTransform)
.forEach(metricV -> {
- final List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
+ final List<IRVertex> reducerV = processedDAG.getCurrentDAGSnapshot().getChildren(metricV.getId());
reducerV.forEach(rV -> {
if (rV instanceof OperatorVertex &&
!(((OperatorVertex) rV).getTransform() instanceof MessageAggregateTransform)) {
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index 96c0c74..cfe6b78 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -62,10 +62,10 @@ public class LoopFusionPassTest {
groupedDAG = new LoopExtractionPass().optimize(originalALSDAG);
groupedDAG.topologicalDo(v -> {
- dagToBeFusedBuilder.addVertex(v, groupedDAG.getUnderlyingDAG());
+ dagToBeFusedBuilder.addVertex(v, groupedDAG.getCurrentDAGSnapshot());
groupedDAG.getIncomingEdgesOf(v).forEach(dagToBeFusedBuilder::connectVertices);
- dagNotToBeFusedBuilder.addVertex(v, groupedDAG.getUnderlyingDAG());
+ dagNotToBeFusedBuilder.addVertex(v, groupedDAG.getCurrentDAGSnapshot());
groupedDAG.getIncomingEdgesOf(v).forEach(dagNotToBeFusedBuilder::connectVertices);
});
final Optional<LoopVertex> loopInDAG = groupedDAG.getTopologicalSort().stream()
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
index b87f819..63fa8f9 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -18,13 +18,11 @@
*/
package org.apache.nemo.runtime.common.partitioner;
-import org.apache.nemo.common.ir.vertex.transform.StreamTransform;
-
/**
* An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
* WARNING: Because this partitioner assigns a dedicated key per element, it should be used under specific circumstances
* that the number of output element is not that many. For example, every output element of
- * {@link StreamTransform} inserted by large shuffle optimization is always
+ * StreamTransform inserted by large shuffle optimization is always
* a partition. In this case, assigning a key for each element can be useful.
*/
@DedicatedKeyPerElement
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java
index 99e17c7..a4d1ca5 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StagePartitioner.java
@@ -18,7 +18,6 @@
*/
package org.apache.nemo.runtime.common.plan;
-import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;