You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/21 08:57:21 UTC

[incubator-nemo] branch reshaping updated: largeshuffle done

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new 1257851  largeshuffle done
1257851 is described below

commit 12578518a2f89e0f26eb6b35aed99331252fa6f1
Author: John Yang <jo...@apache.org>
AuthorDate: Mon Jan 21 17:57:09 2019 +0900

    largeshuffle done
---
 .../main/java/org/apache/nemo/common/ir/IRDAG.java | 105 ++++++++++++++++-----
 .../nemo/compiler/optimizer/NemoOptimizer.java     |  71 +++++++-------
 .../LargeShuffleDataPersistencePass.java           |  12 +--
 .../reshaping/LargeShuffleRelayReshapingPass.java  |  21 ++++-
 4 files changed, 138 insertions(+), 71 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index 0462f37..d91d674 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -19,68 +19,123 @@
 package org.apache.nemo.common.ir;
 
 import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.LoopVertex;
 import org.apache.nemo.common.ir.vertex.system.MessageBarrierVertex;
 import org.apache.nemo.common.ir.vertex.system.StreamVertex;
 import org.apache.nemo.common.ir.vertex.system.SystemIRVertex;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.List;
+import java.util.function.Consumer;
 
 /**
  * An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application).
  * - IRVertex: A data-parallel operation. (e.g., map)
  * - IREdge: A data dependency between two operations. (e.g., shuffle)
  *
- * Largely two types of IRDAG optimization methods are provided.
+ * Largely two types of IRDAG optimization(modification) methods are provided.
  * All of these methods preserve application semantics.
- * - Reshaping: insert(), delete() on an IRDAG
- * - Annotation: setProperty(), getPropertyValue() on IRVertex/IREdge
+ * - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge
+ * - Reshaping: insert(), delete() on the IRDAG
  */
-public class IRDAG extends DAG<IRVertex, IREdge> {
-  public IRDAG(final Set<IRVertex> vertices,
-               final Map<IRVertex, Set<IREdge>> incomingEdges,
-               final Map<IRVertex, Set<IREdge>> outgoingEdges,
-               final Map<IRVertex, LoopVertex> assignedLoopVertexMap,
-               final Map<IRVertex, Integer> loopStackDepthMap) {
-    super(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
+public class IRDAG {
+  private DAG<IRVertex, IREdge> dag; // internal DAG, can be updated by reshaping methods.
+
+  public IRDAG(final DAG<IRVertex, IREdge> dag) {
+    this.dag = dag;
+  }
+
+  ////////////////////////////////////////////////// Read-only traversal methods for annotations.
+
+  public void topologicalDo(final Consumer<IRVertex> function) {
+    dag.topologicalDo(function);
+  }
+
+  public List<IRVertex> getVertices() {
+    return dag.getVertices();
   }
 
+  public List<IREdge> getIncomingEdgesOf(final String vertexId) {
+    return dag.getIncomingEdgesOf(vertexId);
+  }
+
+  public List<IREdge> getIncomingEdgesOf(final IRVertex v) {
+    return dag.getIncomingEdgesOf(v);
+  }
+
+  ////////////////////////////////////////////////// Reshaping methods.
+
   /**
    * Before: src > edgeToStreamize > dst
-   * After: src > edgeToStreamize > streamVertex > oneToOneEdge > dst
+   * After: src > edgeToStreamizeWithNewDestination > streamVertex > oneToOneEdge > dst
    * (replaces the "Before" relationships)
    *
    * @param streamVertex to insert.
    * @param edgeToStreamize for inserting.
    */
-  void insert(final StreamVertex streamVertex, final IREdge edgeToStreamize) {
+  public void insert(final StreamVertex streamVertex, final IREdge edgeToStreamize) {
+    // Create a completely new DAG with the vertex inserted.
+    final DAGBuilder builder = new DAGBuilder();
+
+    // Insert the vertex.
     builder.addVertex(streamVertex);
-    edge.copyExecutionPropertiesTo(edgeToStreamize);
-    final IREdge newEdgeFromMerger = new IREdge(CommunicationPatternProperty.Value.OneToOne, streamVertex, v);
-    newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
-    newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
-    builder.connectVertices(edgeToStreamize);
-    builder.connectVertices(newEdgeFromMerger);
+
+    // Build the new DAG to reflect the new topology.
+    dag.topologicalDo(v -> {
+      // None of the existing vertices are deleted.
+      builder.addVertex(v);
+
+      if (edgeToStreamize.getDst().equals(v)
+        && dag.getIncomingEdgesOf(v).stream().anyMatch(e -> e.equals(edgeToStreamize))) {
+        // Edge to the streamVertex.
+        final IREdge edgeToStreamizeWithNewDestination = new IREdge(
+          edgeToStreamize.getPropertyValue(CommunicationPatternProperty.class).get(),
+          edgeToStreamize.getSrc(),
+          streamVertex);
+        edgeToStreamize.copyExecutionPropertiesTo(edgeToStreamizeWithNewDestination);
+
+        // Edge from the streamVertex.
+        final IREdge oneToOneEdge = new IREdge(CommunicationPatternProperty.Value.OneToOne, streamVertex, v);
+        oneToOneEdge.setProperty(EncoderProperty.of(edgeToStreamize.getPropertyValue(EncoderProperty.class).get()));
+        oneToOneEdge.setProperty(DecoderProperty.of(edgeToStreamize.getPropertyValue(DecoderProperty.class).get()));
+
+        // Track the new edges.
+        builder.connectVertices(edgeToStreamize);
+        builder.connectVertices(oneToOneEdge);
+      } else {
+        // Not the destination of the edgeToStreamize, so simply connect vertices as before.
+        dag.getIncomingEdgesOf(v).forEach(builder::connectVertices);
+      }
+    });
+
+    dag = builder.build(); // update the DAG.
   }
 
   /**
    * Before: src > edgeToGetStatisticsOf > dst
-   * After: src > oneToOneEdge(clone of edgeToGetStatisticsOf) > messageBarrierVertex
-   * (leaves the "Before" relationships unmodified)
+   * After: src > oneToOneEdge(a clone of edgeToGetStatisticsOf) > messageBarrierVertex
+   * (the "Before" relationships are unmodified)
    *
    * @param messageBarrierVertex
    * @param edgeToGetStatisticsOf
    */
-  void insert(final MessageBarrierVertex messageBarrierVertex, final IREdge edgeToGetStatisticsOf) {
+  public void insert(final MessageBarrierVertex messageBarrierVertex, final IREdge edgeToGetStatisticsOf) {
+  }
+
+  /**
+   * @param systemIRVertexToDelete to delete.
+   */
+  public void delete(final SystemIRVertex systemIRVertexToDelete) {
+    // TODO: recursively delete backwards
   }
 
+  ////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving not guaranteed)
 
-  void delete(final SystemIRVertex systemIRVertexToDelete) {
+  public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge> unsafeReshaping) {
+    this.dag = unsafeReshaping.apply(dag);
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 4a254a1..11d19b9 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -91,7 +91,7 @@ public final class NemoOptimizer implements Optimizer {
       final IRDAG cacheFilteredDag = handleCaching(dag, cacheIdToEdge);
       if (!cacheIdToEdge.isEmpty()) {
         cacheFilteredDag.storeJSON(dagDirectory, irDagId + "FilterCache",
-            "IR after cache filtering");
+          "IR after cache filtering");
       }
 
       // Conduct compile-time optimization.
@@ -103,7 +103,7 @@ public final class NemoOptimizer implements Optimizer {
 
       optimizedDAG = optimizationPolicy.runCompileTimeOptimization(cacheFilteredDag, dagDirectory);
       optimizedDAG.storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
-          "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
+        "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
     } catch (final Exception e) {
       throw new CompileTimeOptimizationException(e);
     }
@@ -121,8 +121,8 @@ public final class NemoOptimizer implements Optimizer {
     cacheIdToEdge.forEach((cacheId, edge) -> {
       if (!cacheIdToParallelism.containsKey(cacheId)) {
         cacheIdToParallelism.put(
-            cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
-                .orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
+          cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
+            .orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
       }
     });
 
@@ -142,21 +142,20 @@ public final class NemoOptimizer implements Optimizer {
    * @param cacheIdToEdge the map from cache ID to edge to update.
    * @return the cropped dag regarding to caching.
    */
-  private IRDAG handleCaching(final IRDAG dag,
-                                              final Map<UUID, IREdge> cacheIdToEdge) {
+  private IRDAG handleCaching(final IRDAG dag, final Map<UUID, IREdge> cacheIdToEdge) {
     dag.topologicalDo(irVertex ->
-        dag.getIncomingEdgesOf(irVertex).forEach(
-            edge -> edge.getPropertyValue(CacheIDProperty.class).
-                ifPresent(cacheId -> cacheIdToEdge.put(cacheId, edge))
-        ));
+      dag.getIncomingEdgesOf(irVertex).forEach(
+        edge -> edge.getPropertyValue(CacheIDProperty.class).
+          ifPresent(cacheId -> cacheIdToEdge.put(cacheId, edge))
+      ));
 
     if (cacheIdToEdge.isEmpty()) {
       return dag;
     } else {
       final DAGBuilder<IRVertex, IREdge> filteredDagBuilder = new DAGBuilder<>();
       final List<IRVertex> sinkVertices = dag.getVertices().stream()
-          .filter(irVertex -> dag.getOutgoingEdgesOf(irVertex).isEmpty())
-          .collect(Collectors.toList());
+        .filter(irVertex -> dag.getOutgoingEdgesOf(irVertex).isEmpty())
+        .collect(Collectors.toList());
       sinkVertices.forEach(filteredDagBuilder::addVertex); // Sink vertex cannot be cached already.
 
       sinkVertices.forEach(sinkVtx -> addNonCachedVerticesAndEdges(dag, sinkVtx, filteredDagBuilder));
@@ -176,41 +175,41 @@ public final class NemoOptimizer implements Optimizer {
                                             final IRVertex irVertex,
                                             final DAGBuilder<IRVertex, IREdge> builder) {
     if (irVertex.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)
-        && dag.getIncomingEdgesOf(irVertex).stream()
-        .filter(irEdge -> irEdge.getPropertyValue(CacheIDProperty.class).isPresent())
-        .anyMatch(irEdge -> cacheIdToParallelism
-            .containsKey(irEdge.getPropertyValue(CacheIDProperty.class).get()))) {
+      && dag.getIncomingEdgesOf(irVertex).stream()
+      .filter(irEdge -> irEdge.getPropertyValue(CacheIDProperty.class).isPresent())
+      .anyMatch(irEdge -> cacheIdToParallelism
+        .containsKey(irEdge.getPropertyValue(CacheIDProperty.class).get()))) {
       builder.removeVertex(irVertex); // Ignore ghost vertex which was cached once.
       return;
     }
 
     dag.getIncomingEdgesOf(irVertex).stream()
-        .forEach(edge -> {
-      final Optional<UUID> cacheId = dag.getOutgoingEdgesOf(edge.getSrc()).stream()
+      .forEach(edge -> {
+        final Optional<UUID> cacheId = dag.getOutgoingEdgesOf(edge.getSrc()).stream()
           .filter(edgeToFilter -> edgeToFilter.getPropertyValue(CacheIDProperty.class).isPresent())
           .map(edgeToMap -> edgeToMap.getPropertyValue(CacheIDProperty.class).get())
           .findFirst();
-      if (cacheId.isPresent() && cacheIdToParallelism.get(cacheId.get()) != null) { // Cached already.
-        // Replace the vertex emitting cached edge with a cached source vertex.
-        final IRVertex cachedDataRelayVertex = new CachedSourceVertex(cacheIdToParallelism.get(cacheId.get()));
-        cachedDataRelayVertex.setPropertyPermanently(ParallelismProperty.of(cacheIdToParallelism.get(cacheId.get())));
+        if (cacheId.isPresent() && cacheIdToParallelism.get(cacheId.get()) != null) { // Cached already.
+          // Replace the vertex emitting cached edge with a cached source vertex.
+          final IRVertex cachedDataRelayVertex = new CachedSourceVertex(cacheIdToParallelism.get(cacheId.get()));
+          cachedDataRelayVertex.setPropertyPermanently(ParallelismProperty.of(cacheIdToParallelism.get(cacheId.get())));
 
-        builder.addVertex(cachedDataRelayVertex);
-        final IREdge newEdge = new IREdge(
+          builder.addVertex(cachedDataRelayVertex);
+          final IREdge newEdge = new IREdge(
             edge.getPropertyValue(CommunicationPatternProperty.class)
-                .orElseThrow(() -> new RuntimeException("No communication pattern on an ir edge")),
+              .orElseThrow(() -> new RuntimeException("No communication pattern on an ir edge")),
             cachedDataRelayVertex,
             irVertex);
-        edge.copyExecutionPropertiesTo(newEdge);
-        newEdge.setProperty(CacheIDProperty.of(cacheId.get()));
-        builder.connectVertices(newEdge);
-        // Stop the recursion for this vertex.
-      } else {
-        final IRVertex srcVtx = edge.getSrc();
-        builder.addVertex(srcVtx);
-        builder.connectVertices(edge);
-        addNonCachedVerticesAndEdges(dag, srcVtx, builder);
-      }
-    });
+          edge.copyExecutionPropertiesTo(newEdge);
+          newEdge.setProperty(CacheIDProperty.of(cacheId.get()));
+          builder.connectVertices(newEdge);
+          // Stop the recursion for this vertex.
+        } else {
+          final IRVertex srcVtx = edge.getSrc();
+          builder.addVertex(srcVtx);
+          builder.connectVertices(edge);
+          addNonCachedVerticesAndEdges(dag, srcVtx, builder);
+        }
+      });
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
index 212aab8..31c606b 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
@@ -41,11 +41,11 @@ public final class LargeShuffleDataPersistencePass extends AnnotatingPass {
   @Override
   public void optimize(final IRDAG dag) {
     dag.topologicalDo(irVertex ->
-        dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          final DataFlowProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowProperty.class).get();
-          if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
-            irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
-          }
-        }));
+      dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+        final DataFlowProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowProperty.class).get();
+        if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
+          irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+        }
+      }));
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
index c47f509..e9ce8e2 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
@@ -24,14 +24,12 @@ import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProp
 import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.system.StreamVertex;
 import org.apache.nemo.common.ir.vertex.transform.RelayTransform;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 /**
- * Pass to modify the DAG for a job to batch the disk seek.
- * It adds a {@link OperatorVertex} with {@link RelayTransform} before the vertices
- * receiving shuffle edges,
- * to merge the shuffled data in memory and write to the disk at once.
+ * Inserts the StreamVertex for each shuffle edge.
  */
 @Requires(CommunicationPatternProperty.class)
 public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
@@ -43,6 +41,20 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
     super(LargeShuffleRelayReshapingPass.class);
   }
 
+
+  @Override
+  public void optimize(final IRDAG dag) {
+    dag.topologicalDo(vertex -> {
+      dag.getIncomingEdgesOf(vertex).forEach(edge -> {
+        if (CommunicationPatternProperty.Value.Shuffle
+          .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+          dag.insert(new StreamVertex(), edge);
+        }
+      });
+    });
+  }
+
+  /*
   @Override
   public void optimize(final IRDAG dag) {
     dag.topologicalDo(v -> {
@@ -79,4 +91,5 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
     });
     return builder.build();
   }
+  */
 }