You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 04:51:05 UTC
[incubator-nemo] branch reshaping updated: src code compiles
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/reshaping by this push:
new 17d8202 src code compiles
17d8202 is described below
commit 17d820221e1e6115945954b1f934840654abe7a8
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 13:50:54 2019 +0900
src code compiles
---
.../main/java/org/apache/nemo/common/ir/IRDAG.java | 64 +++++++++++++++-------
.../java/org/apache/nemo/common/pass/Pass.java | 12 ++--
.../nemo/compiler/optimizer/NemoOptimizer.java | 2 +-
.../MapReduceDisaggregationOptimization.java | 4 +-
.../nemo/compiler/optimizer/policy/PolicyImpl.java | 42 +++++++-------
5 files changed, 73 insertions(+), 51 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index c5de9cb..3f87be4 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -18,6 +18,8 @@
*/
package org.apache.nemo.common.ir;
+import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.common.Pair;
import org.apache.nemo.common.coder.*;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
@@ -28,7 +30,6 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.system.MessageAggregationVertex;
import org.apache.nemo.common.ir.vertex.system.MessageBarrierVertex;
import org.apache.nemo.common.ir.vertex.system.StreamVertex;
-import org.apache.nemo.common.ir.vertex.system.SystemIRVertex;
import java.util.List;
import java.util.function.Consumer;
@@ -51,7 +52,7 @@ public class IRDAG {
this.dag = dag;
}
- ////////////////////////////////////////////////// Read-only traversal methods for annotations.
+ ////////////////////////////////////////////////// Read-only traversal methods.
public void topologicalDo(final Consumer<IRVertex> function) {
dag.topologicalDo(function);
@@ -69,6 +70,30 @@ public class IRDAG {
return dag.getIncomingEdgesOf(v);
}
+ public List<IREdge> getOutgoingEdgesOf(final IRVertex v) {
+ return dag.getOutgoingEdgesOf(v);
+ }
+
+ public List<IREdge> getOutgoingEdgesOf(final String vertexId) {
+ return dag.getOutgoingEdgesOf(vertexId);
+ }
+
+ public void storeJSON(final String directory, final String name, final String description) {
+ dag.storeJSON(directory, name, description);
+ }
+
+ public List<IRVertex> getTopologicalSort() {
+ return dag.getTopologicalSort();
+ }
+
+ public List<IRVertex> getDescendants(final String vertexId) {
+ return dag.getDescendants(vertexId);
+ }
+
+ public IRVertex getVertexById(final String id) {
+ return dag.getVertexById(id);
+ }
+
////////////////////////////////////////////////// Reshaping methods.
/**
@@ -126,7 +151,8 @@ public class IRDAG {
* Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.
*
* Before: src > edgeToGetStatisticsOf > dst
- * After: src > oneToOneEdge(a clone of edgeToGetStatisticsOf) > messageBarrierVertex > messageAggregationVertex > dst
+ * After: src > oneToOneEdge(a clone of edgeToGetStatisticsOf) > messageBarrierVertex >
+ * shuffleEdge > messageAggregationVertex > broadcastEdge > dst
* (the "Before" relationships are unmodified)
*
* @param messageBarrierVertex to insert.
@@ -160,7 +186,7 @@ public class IRDAG {
builder.connectVertices(clone);
// messageBarrierVertex to the messageAggregationVertex
- final IREdge edgeToABV = generateEdgeToABV(edge, messageBarrierVertex, messageAggregationVertex);
+ final IREdge edgeToABV = edgeBetweenMessageVertices(edge, messageBarrierVertex, messageAggregationVertex);
builder.connectVertices(edgeToABV);
// Connection vertex
@@ -172,12 +198,12 @@ public class IRDAG {
// The original edge
// We then insert the vertex with MessageBarrierTransform and vertex with MessageAggregateTransform
// between the vertex and incoming vertices.
- final IREdge edgeToOriginalDstV =
+ final IREdge edgeToOriginalDst =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
- edge.copyExecutionPropertiesTo(edgeToOriginalDstV);
- edgeToOriginalDstV.setPropertyPermanently(
+ edge.copyExecutionPropertiesTo(edgeToOriginalDst);
+ edgeToOriginalDst.setPropertyPermanently(
MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
- builder.connectVertices(edgeToOriginalDstV);
+ builder.connectVertices(edgeToOriginalDst);
} else {
// NO MATCH, so simply connect vertices as before.
builder.connectVertices(edge);
@@ -186,13 +212,6 @@ public class IRDAG {
});
}
- /**
- * @param systemIRVertexToDelete to delete.
- */
- public void delete(final SystemIRVertex systemIRVertexToDelete) {
- // TODO: recursively delete backwards
- }
-
////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving is not guaranteed).
public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) {
@@ -207,14 +226,21 @@ public class IRDAG {
* @param abv the vertex with MessageAggregateTransform.
* @return the generated egde from {@code mcv} to {@code abv}.
*/
- private IREdge generateEdgeToABV(final IREdge edge,
- final OperatorVertex mcv,
- final OperatorVertex abv) {
+ private IREdge edgeBetweenMessageVertices(final IREdge edge,
+ final OperatorVertex mcv,
+ final OperatorVertex abv) {
final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, mcv, abv);
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
- newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
+ final KeyExtractor pairKeyExtractor = (element) -> {
+ if (element instanceof Pair) {
+ return ((Pair) element).left();
+ } else {
+ throw new IllegalStateException(element.toString());
+ }
+ };
+ newEdge.setProperty(KeyExtractorProperty.of(pairKeyExtractor));
// Dynamic optimization handles statistics on key-value data by default.
// We need to get coders for encoding/decoding the keys to send data to
diff --git a/common/src/main/java/org/apache/nemo/common/pass/Pass.java b/common/src/main/java/org/apache/nemo/common/pass/Pass.java
index a2297e1..7a540c9 100644
--- a/common/src/main/java/org/apache/nemo/common/pass/Pass.java
+++ b/common/src/main/java/org/apache/nemo/common/pass/Pass.java
@@ -18,9 +18,7 @@
*/
package org.apache.nemo.common.pass;
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.IRDAG;
import java.io.Serializable;
import java.util.function.Predicate;
@@ -29,7 +27,7 @@ import java.util.function.Predicate;
* Abstract class for optimization passes. All passes basically extends this class.
*/
public abstract class Pass implements Serializable {
- private Predicate<DAG<IRVertex, IREdge>> condition;
+ private Predicate<IRDAG> condition;
/**
* Default constructor.
@@ -42,7 +40,7 @@ public abstract class Pass implements Serializable {
* Constructor.
* @param condition condition under which to run the pass.
*/
- private Pass(final Predicate<DAG<IRVertex, IREdge>> condition) {
+ private Pass(final Predicate<IRDAG> condition) {
this.condition = condition;
}
@@ -50,7 +48,7 @@ public abstract class Pass implements Serializable {
* Getter for the condition under which to apply the pass.
* @return the condition under which to apply the pass.
*/
- public final Predicate<DAG<IRVertex, IREdge>> getCondition() {
+ public final Predicate<IRDAG> getCondition() {
return this.condition;
}
@@ -59,7 +57,7 @@ public abstract class Pass implements Serializable {
* @param newCondition the new condition to add to the existing condition.
* @return the condition with the new condition added.
*/
- public final Pass addCondition(final Predicate<DAG<IRVertex, IREdge>> newCondition) {
+ public final Pass addCondition(final Predicate<IRDAG> newCondition) {
this.condition = this.condition.and(newCondition);
return this;
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 11d19b9..631d4f3 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -160,7 +160,7 @@ public final class NemoOptimizer implements Optimizer {
sinkVertices.forEach(sinkVtx -> addNonCachedVerticesAndEdges(dag, sinkVtx, filteredDagBuilder));
- return filteredDagBuilder.buildWithoutSourceCheck();
+ return new IRDAG(filteredDagBuilder.buildWithoutSourceCheck());
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
index 97989dc..ec533ad 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
@@ -65,12 +65,12 @@ public final class MapReduceDisaggregationOptimization {
final IREdge edge2 = new IREdge(CommunicationPatternProperty.Value.Shuffle, map, reduce);
builder.connectVertices(edge2);
- final IRDAG dag = builder.build();
+ final IRDAG dag = new IRDAG(builder.build());
LOG.info("Before Optimization");
LOG.info(dag.toString());
// Optimize
- final DAG optimizedDAG = new DisaggregationPolicy().runCompileTimeOptimization(dag, EMPTY_DAG_DIRECTORY);
+ final IRDAG optimizedDAG = new DisaggregationPolicy().runCompileTimeOptimization(dag, EMPTY_DAG_DIRECTORY);
// After
LOG.info("After Optimization");
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
index 26577bd..31a56cc 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
@@ -68,32 +68,30 @@ public final class PolicyImpl implements Policy {
* @throws Exception Exceptions on the way.
*/
private static IRDAG process(final IRDAG dag,
- final Iterator<CompileTimePass> passes,
- final String dagDirectory) {
+ final Iterator<CompileTimePass> passes,
+ final String dagDirectory) {
if (passes.hasNext()) {
final CompileTimePass passToApply = passes.next();
- final IRDAG processedDAG;
if (passToApply.getCondition().test(dag)) {
LOG.info("Apply {} to the DAG", passToApply.getClass().getSimpleName());
// Apply the pass to the DAG.
- processedDAG = passToApply.apply(dag);
+ passToApply.optimize(dag);
// Ensure AnnotatingPass and ReshapingPass functions as intended.
- if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, processedDAG))
- || (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, processedDAG))) {
+ if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, dag))
+ || (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, dag))) {
throw new CompileTimeOptimizationException(passToApply.getClass().getSimpleName()
- + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
- + "Modify it or use a general CompileTimePass");
+ + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
+ + "Modify it or use a general CompileTimePass");
}
// Save the processed JSON DAG.
- processedDAG.storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
- "DAG after optimization");
+ dag.storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
+ "DAG after optimization");
} else {
LOG.info("Condition unmet for applying {} to the DAG", passToApply.getClass().getSimpleName());
- processedDAG = dag;
}
// recursively apply the following passes.
- return process(processedDAG, passes, dagDirectory);
+ return process(dag, passes, dagDirectory);
} else {
return dag;
}
@@ -134,7 +132,7 @@ public final class PolicyImpl implements Policy {
}
// number of edges should match.
if (beforeVertexIncomingEdges.hasNext() || afterVertexIncomingEdges.hasNext()
- || beforeVertexOutgoingEdges.hasNext() || afterVertexOutgoingEdges.hasNext()) {
+ || beforeVertexOutgoingEdges.hasNext() || afterVertexOutgoingEdges.hasNext()) {
return false;
}
}
@@ -176,14 +174,14 @@ public final class PolicyImpl implements Policy {
public void registerRunTimeOptimizations(final Injector injector, final PubSubEventHandlerWrapper pubSubWrapper) {
LOG.info("Register run-time optimizations to the PubSubHandler");
runtimePasses.forEach(runtimePass ->
- runtimePass.getEventHandlerClasses().forEach(runtimeEventHandlerClass -> {
- try {
- final RuntimeEventHandler runtimeEventHandler = injector.getInstance(runtimeEventHandlerClass);
- pubSubWrapper.getPubSubEventHandler()
- .subscribe(runtimeEventHandler.getEventClass(), runtimeEventHandler);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }));
+ runtimePass.getEventHandlerClasses().forEach(runtimeEventHandlerClass -> {
+ try {
+ final RuntimeEventHandler runtimeEventHandler = injector.getInstance(runtimeEventHandlerClass);
+ pubSubWrapper.getPubSubEventHandler()
+ .subscribe(runtimeEventHandler.getEventClass(), runtimeEventHandler);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
}
}