You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 04:51:05 UTC

[incubator-nemo] branch reshaping updated: src code compiles

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new 17d8202  src code compiles
17d8202 is described below

commit 17d820221e1e6115945954b1f934840654abe7a8
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 13:50:54 2019 +0900

    src code compiles
---
 .../main/java/org/apache/nemo/common/ir/IRDAG.java | 64 +++++++++++++++-------
 .../java/org/apache/nemo/common/pass/Pass.java     | 12 ++--
 .../nemo/compiler/optimizer/NemoOptimizer.java     |  2 +-
 .../MapReduceDisaggregationOptimization.java       |  4 +-
 .../nemo/compiler/optimizer/policy/PolicyImpl.java | 42 +++++++-------
 5 files changed, 73 insertions(+), 51 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index c5de9cb..3f87be4 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nemo.common.ir;
 
+import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.coder.*;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
@@ -28,7 +30,6 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.system.MessageAggregationVertex;
 import org.apache.nemo.common.ir.vertex.system.MessageBarrierVertex;
 import org.apache.nemo.common.ir.vertex.system.StreamVertex;
-import org.apache.nemo.common.ir.vertex.system.SystemIRVertex;
 
 import java.util.List;
 import java.util.function.Consumer;
@@ -51,7 +52,7 @@ public class IRDAG {
     this.dag = dag;
   }
 
-  ////////////////////////////////////////////////// Read-only traversal methods for annotations.
+  ////////////////////////////////////////////////// Read-only traversal methods.
 
   public void topologicalDo(final Consumer<IRVertex> function) {
     dag.topologicalDo(function);
@@ -69,6 +70,30 @@ public class IRDAG {
     return dag.getIncomingEdgesOf(v);
   }
 
+  public List<IREdge> getOutgoingEdgesOf(final IRVertex v) {
+    return dag.getOutgoingEdgesOf(v);
+  }
+
+  public List<IREdge> getOutgoingEdgesOf(final String vertexId) {
+    return dag.getOutgoingEdgesOf(vertexId);
+  }
+
+  public void storeJSON(final String directory, final String name, final String description) {
+    dag.storeJSON(directory, name, description);
+  }
+
+  public List<IRVertex> getTopologicalSort() {
+    return dag.getTopologicalSort();
+  }
+
+  public List<IRVertex> getDescendants(final String vertexId) {
+    return dag.getDescendants(vertexId);
+  }
+
+  public IRVertex getVertexById(final String id) {
+    return dag.getVertexById(id);
+  }
+
   ////////////////////////////////////////////////// Reshaping methods.
 
   /**
@@ -126,7 +151,8 @@ public class IRDAG {
    * Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.
    *
    * Before: src > edgeToGetStatisticsOf > dst
-   * After: src > oneToOneEdge(a clone of edgeToGetStatisticsOf) > messageBarrierVertex > messageAggregationVertex > dst
+   * After: src > oneToOneEdge(a clone of edgeToGetStatisticsOf) > messageBarrierVertex >
+   *        shuffleEdge > messageAggregationVertex > broadcastEdge > dst
    * (the "Before" relationships are unmodified)
    *
    * @param messageBarrierVertex to insert.
@@ -160,7 +186,7 @@ public class IRDAG {
           builder.connectVertices(clone);
 
           // messageBarrierVertex to the messageAggregationVertex
-          final IREdge edgeToABV = generateEdgeToABV(edge, messageBarrierVertex, messageAggregationVertex);
+          final IREdge edgeToABV = edgeBetweenMessageVertices(edge, messageBarrierVertex, messageAggregationVertex);
           builder.connectVertices(edgeToABV);
 
           // Connection vertex
@@ -172,12 +198,12 @@ public class IRDAG {
           // The original edge
           // We then insert the vertex with MessageBarrierTransform and vertex with MessageAggregateTransform
           // between the vertex and incoming vertices.
-          final IREdge edgeToOriginalDstV =
+          final IREdge edgeToOriginalDst =
             new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
-          edge.copyExecutionPropertiesTo(edgeToOriginalDstV);
-          edgeToOriginalDstV.setPropertyPermanently(
+          edge.copyExecutionPropertiesTo(edgeToOriginalDst);
+          edgeToOriginalDst.setPropertyPermanently(
             MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
-          builder.connectVertices(edgeToOriginalDstV);
+          builder.connectVertices(edgeToOriginalDst);
         } else {
           // NO MATCH, so simply connect vertices as before.
           builder.connectVertices(edge);
@@ -186,13 +212,6 @@ public class IRDAG {
     });
   }
 
-  /**
-   * @param systemIRVertexToDelete to delete.
-   */
-  public void delete(final SystemIRVertex systemIRVertexToDelete) {
-    // TODO: recursively delete backwards
-  }
-
   ////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving is not guaranteed).
 
   public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) {
@@ -207,14 +226,21 @@ public class IRDAG {
    * @param abv the vertex with MessageAggregateTransform.
    * @return the generated egde from {@code mcv} to {@code abv}.
    */
-  private IREdge generateEdgeToABV(final IREdge edge,
-                                   final OperatorVertex mcv,
-                                   final OperatorVertex abv) {
+  private IREdge edgeBetweenMessageVertices(final IREdge edge,
+                                            final OperatorVertex mcv,
+                                            final OperatorVertex abv) {
     final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, mcv, abv);
     newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
-    newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
+    final KeyExtractor pairKeyExtractor = (element) -> {
+      if (element instanceof Pair) {
+        return ((Pair) element).left();
+      } else {
+        throw new IllegalStateException(element.toString());
+      }
+    };
+    newEdge.setProperty(KeyExtractorProperty.of(pairKeyExtractor));
 
     // Dynamic optimization handles statistics on key-value data by default.
     // We need to get coders for encoding/decoding the keys to send data to
diff --git a/common/src/main/java/org/apache/nemo/common/pass/Pass.java b/common/src/main/java/org/apache/nemo/common/pass/Pass.java
index a2297e1..7a540c9 100644
--- a/common/src/main/java/org/apache/nemo/common/pass/Pass.java
+++ b/common/src/main/java/org/apache/nemo/common/pass/Pass.java
@@ -18,9 +18,7 @@
  */
 package org.apache.nemo.common.pass;
 
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.IRDAG;
 
 import java.io.Serializable;
 import java.util.function.Predicate;
@@ -29,7 +27,7 @@ import java.util.function.Predicate;
  * Abstract class for optimization passes. All passes basically extends this class.
  */
 public abstract class Pass implements Serializable {
-  private Predicate<DAG<IRVertex, IREdge>> condition;
+  private Predicate<IRDAG> condition;
 
   /**
    * Default constructor.
@@ -42,7 +40,7 @@ public abstract class Pass implements Serializable {
    * Constructor.
    * @param condition condition under which to run the pass.
    */
-  private Pass(final Predicate<DAG<IRVertex, IREdge>> condition) {
+  private Pass(final Predicate<IRDAG> condition) {
     this.condition = condition;
   }
 
@@ -50,7 +48,7 @@ public abstract class Pass implements Serializable {
    * Getter for the condition under which to apply the pass.
    * @return the condition under which to apply the pass.
    */
-  public final Predicate<DAG<IRVertex, IREdge>> getCondition() {
+  public final Predicate<IRDAG> getCondition() {
     return this.condition;
   }
 
@@ -59,7 +57,7 @@ public abstract class Pass implements Serializable {
    * @param newCondition the new condition to add to the existing condition.
    * @return the condition with the new condition added.
    */
-  public final Pass addCondition(final Predicate<DAG<IRVertex, IREdge>> newCondition) {
+  public final Pass addCondition(final Predicate<IRDAG> newCondition) {
     this.condition = this.condition.and(newCondition);
     return this;
   }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 11d19b9..631d4f3 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -160,7 +160,7 @@ public final class NemoOptimizer implements Optimizer {
 
       sinkVertices.forEach(sinkVtx -> addNonCachedVerticesAndEdges(dag, sinkVtx, filteredDagBuilder));
 
-      return filteredDagBuilder.buildWithoutSourceCheck();
+      return new IRDAG(filteredDagBuilder.buildWithoutSourceCheck());
     }
   }
 
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
index 97989dc..ec533ad 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
@@ -65,12 +65,12 @@ public final class MapReduceDisaggregationOptimization {
     final IREdge edge2 = new IREdge(CommunicationPatternProperty.Value.Shuffle, map, reduce);
     builder.connectVertices(edge2);
 
-    final IRDAG dag = builder.build();
+    final IRDAG dag = new IRDAG(builder.build());
     LOG.info("Before Optimization");
     LOG.info(dag.toString());
 
     // Optimize
-    final DAG optimizedDAG = new DisaggregationPolicy().runCompileTimeOptimization(dag, EMPTY_DAG_DIRECTORY);
+    final IRDAG optimizedDAG = new DisaggregationPolicy().runCompileTimeOptimization(dag, EMPTY_DAG_DIRECTORY);
 
     // After
     LOG.info("After Optimization");
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
index 26577bd..31a56cc 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyImpl.java
@@ -68,32 +68,30 @@ public final class PolicyImpl implements Policy {
    * @throws Exception Exceptions on the way.
    */
   private static IRDAG process(final IRDAG dag,
-                                               final Iterator<CompileTimePass> passes,
-                                               final String dagDirectory) {
+                               final Iterator<CompileTimePass> passes,
+                               final String dagDirectory) {
     if (passes.hasNext()) {
       final CompileTimePass passToApply = passes.next();
-      final IRDAG processedDAG;
 
       if (passToApply.getCondition().test(dag)) {
         LOG.info("Apply {} to the DAG", passToApply.getClass().getSimpleName());
         // Apply the pass to the DAG.
-        processedDAG = passToApply.apply(dag);
+        passToApply.optimize(dag);
         // Ensure AnnotatingPass and ReshapingPass functions as intended.
-        if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, processedDAG))
-            || (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, processedDAG))) {
+        if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, dag))
+          || (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, dag))) {
           throw new CompileTimeOptimizationException(passToApply.getClass().getSimpleName()
-              + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
-              + "Modify it or use a general CompileTimePass");
+            + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
+            + "Modify it or use a general CompileTimePass");
         }
         // Save the processed JSON DAG.
-        processedDAG.storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
-            "DAG after optimization");
+        dag.storeJSON(dagDirectory, "ir-after-" + passToApply.getClass().getSimpleName(),
+          "DAG after optimization");
       } else {
         LOG.info("Condition unmet for applying {} to the DAG", passToApply.getClass().getSimpleName());
-        processedDAG = dag;
       }
       // recursively apply the following passes.
-      return process(processedDAG, passes, dagDirectory);
+      return process(dag, passes, dagDirectory);
     } else {
       return dag;
     }
@@ -134,7 +132,7 @@ public final class PolicyImpl implements Policy {
       }
       // number of edges should match.
       if (beforeVertexIncomingEdges.hasNext() || afterVertexIncomingEdges.hasNext()
-          || beforeVertexOutgoingEdges.hasNext() || afterVertexOutgoingEdges.hasNext()) {
+        || beforeVertexOutgoingEdges.hasNext() || afterVertexOutgoingEdges.hasNext()) {
         return false;
       }
     }
@@ -176,14 +174,14 @@ public final class PolicyImpl implements Policy {
   public void registerRunTimeOptimizations(final Injector injector, final PubSubEventHandlerWrapper pubSubWrapper) {
     LOG.info("Register run-time optimizations to the PubSubHandler");
     runtimePasses.forEach(runtimePass ->
-        runtimePass.getEventHandlerClasses().forEach(runtimeEventHandlerClass -> {
-          try {
-            final RuntimeEventHandler runtimeEventHandler = injector.getInstance(runtimeEventHandlerClass);
-            pubSubWrapper.getPubSubEventHandler()
-                .subscribe(runtimeEventHandler.getEventClass(), runtimeEventHandler);
-          } catch (final Exception e) {
-            throw new RuntimeException(e);
-          }
-        }));
+      runtimePass.getEventHandlerClasses().forEach(runtimeEventHandlerClass -> {
+        try {
+          final RuntimeEventHandler runtimeEventHandler = injector.getInstance(runtimeEventHandlerClass);
+          pubSubWrapper.getPubSubEventHandler()
+            .subscribe(runtimeEventHandler.getEventClass(), runtimeEventHandler);
+        } catch (final Exception e) {
+          throw new RuntimeException(e);
+        }
+      }));
   }
 }