You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/21 08:57:21 UTC
[incubator-nemo] branch reshaping updated: largeshuffle done
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/reshaping by this push:
new 1257851 largeshuffle done
1257851 is described below
commit 12578518a2f89e0f26eb6b35aed99331252fa6f1
Author: John Yang <jo...@apache.org>
AuthorDate: Mon Jan 21 17:57:09 2019 +0900
largeshuffle done
---
.../main/java/org/apache/nemo/common/ir/IRDAG.java | 105 ++++++++++++++++-----
.../nemo/compiler/optimizer/NemoOptimizer.java | 71 +++++++-------
.../LargeShuffleDataPersistencePass.java | 12 +--
.../reshaping/LargeShuffleRelayReshapingPass.java | 21 ++++-
4 files changed, 138 insertions(+), 71 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index 0462f37..d91d674 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -19,68 +19,123 @@
package org.apache.nemo.common.ir;
import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.system.MessageBarrierVertex;
import org.apache.nemo.common.ir.vertex.system.StreamVertex;
import org.apache.nemo.common.ir.vertex.system.SystemIRVertex;
-import java.util.Map;
-import java.util.Set;
+import java.util.List;
+import java.util.function.Consumer;
/**
* An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application).
* - IRVertex: A data-parallel operation. (e.g., map)
* - IREdge: A data dependency between two operations. (e.g., shuffle)
*
- * Largely two types of IRDAG optimization methods are provided.
+ * Largely two types of IRDAG optimization(modification) methods are provided.
* All of these methods preserve application semantics.
- * - Reshaping: insert(), delete() on an IRDAG
- * - Annotation: setProperty(), getPropertyValue() on IRVertex/IREdge
+ * - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge
+ * - Reshaping: insert(), delete() on the IRDAG
*/
-public class IRDAG extends DAG<IRVertex, IREdge> {
- public IRDAG(final Set<IRVertex> vertices,
- final Map<IRVertex, Set<IREdge>> incomingEdges,
- final Map<IRVertex, Set<IREdge>> outgoingEdges,
- final Map<IRVertex, LoopVertex> assignedLoopVertexMap,
- final Map<IRVertex, Integer> loopStackDepthMap) {
- super(vertices, incomingEdges, outgoingEdges, assignedLoopVertexMap, loopStackDepthMap);
+public class IRDAG {
+ private DAG<IRVertex, IREdge> dag; // internal DAG, can be updated by reshaping methods.
+
+ public IRDAG(final DAG<IRVertex, IREdge> dag) {
+ this.dag = dag;
+ }
+
+ ////////////////////////////////////////////////// Read-only traversal methods for annotations.
+
+ public void topologicalDo(final Consumer<IRVertex> function) {
+ dag.topologicalDo(function);
+ }
+
+ public List<IRVertex> getVertices() {
+ return dag.getVertices();
}
+ public List<IREdge> getIncomingEdgesOf(final String vertexId) {
+ return dag.getIncomingEdgesOf(vertexId);
+ }
+
+ public List<IREdge> getIncomingEdgesOf(final IRVertex v) {
+ return dag.getIncomingEdgesOf(v);
+ }
+
+ ////////////////////////////////////////////////// Reshaping methods.
+
/**
* Before: src > edgeToStreamize > dst
- * After: src > edgeToStreamize > streamVertex > oneToOneEdge > dst
+ * After: src > edgeToStreamizeWithNewDestination > streamVertex > oneToOneEdge > dst
* (replaces the "Before" relationships)
*
* @param streamVertex to insert.
* @param edgeToStreamize for inserting.
*/
- void insert(final StreamVertex streamVertex, final IREdge edgeToStreamize) {
+ public void insert(final StreamVertex streamVertex, final IREdge edgeToStreamize) {
+ // Create a completely new DAG with the vertex inserted.
+ final DAGBuilder builder = new DAGBuilder();
+
+ // Insert the vertex.
builder.addVertex(streamVertex);
- edge.copyExecutionPropertiesTo(edgeToStreamize);
- final IREdge newEdgeFromMerger = new IREdge(CommunicationPatternProperty.Value.OneToOne, streamVertex, v);
- newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
- newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
- builder.connectVertices(edgeToStreamize);
- builder.connectVertices(newEdgeFromMerger);
+
+ // Build the new DAG to reflect the new topology.
+ dag.topologicalDo(v -> {
+ // None of the existing vertices are deleted.
+ builder.addVertex(v);
+
+ if (edgeToStreamize.getDst().equals(v)
+ && dag.getIncomingEdgesOf(v).stream().anyMatch(e -> e.equals(edgeToStreamize))) {
+ // Edge to the streamVertex.
+ final IREdge edgeToStreamizeWithNewDestination = new IREdge(
+ edgeToStreamize.getPropertyValue(CommunicationPatternProperty.class).get(),
+ edgeToStreamize.getSrc(),
+ streamVertex);
+ edgeToStreamize.copyExecutionPropertiesTo(edgeToStreamizeWithNewDestination);
+
+ // Edge from the streamVertex.
+ final IREdge oneToOneEdge = new IREdge(CommunicationPatternProperty.Value.OneToOne, streamVertex, v);
+ oneToOneEdge.setProperty(EncoderProperty.of(edgeToStreamize.getPropertyValue(EncoderProperty.class).get()));
+ oneToOneEdge.setProperty(DecoderProperty.of(edgeToStreamize.getPropertyValue(DecoderProperty.class).get()));
+
+ // Track the new edges.
+ builder.connectVertices(edgeToStreamize);
+ builder.connectVertices(oneToOneEdge);
+ } else {
+ // Not the destination of the edgeToStreamize, so simply connect vertices as before.
+ dag.getIncomingEdgesOf(v).forEach(builder::connectVertices);
+ }
+ });
+
+ dag = builder.build(); // update the DAG.
}
/**
* Before: src > edgeToGetStatisticsOf > dst
- * After: src > oneToOneEdge(clone of edgeToGetStatisticsOf) > messageBarrierVertex
- * (leaves the "Before" relationships unmodified)
+ * After: src > oneToOneEdge(a clone of edgeToGetStatisticsOf) > messageBarrierVertex
+ * (the "Before" relationships are unmodified)
*
* @param messageBarrierVertex
* @param edgeToGetStatisticsOf
*/
- void insert(final MessageBarrierVertex messageBarrierVertex, final IREdge edgeToGetStatisticsOf) {
+ public void insert(final MessageBarrierVertex messageBarrierVertex, final IREdge edgeToGetStatisticsOf) {
+ }
+
+ /**
+ * @param systemIRVertexToDelete to delete.
+ */
+ public void delete(final SystemIRVertex systemIRVertexToDelete) {
+ // TODO: recursively delete backwards
}
+ ////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving not guaranteed)
- void delete(final SystemIRVertex systemIRVertexToDelete) {
+ public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge> unsafeReshaping) {
+ this.dag = unsafeReshaping.apply(dag);
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
index 4a254a1..11d19b9 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/NemoOptimizer.java
@@ -91,7 +91,7 @@ public final class NemoOptimizer implements Optimizer {
final IRDAG cacheFilteredDag = handleCaching(dag, cacheIdToEdge);
if (!cacheIdToEdge.isEmpty()) {
cacheFilteredDag.storeJSON(dagDirectory, irDagId + "FilterCache",
- "IR after cache filtering");
+ "IR after cache filtering");
}
// Conduct compile-time optimization.
@@ -103,7 +103,7 @@ public final class NemoOptimizer implements Optimizer {
optimizedDAG = optimizationPolicy.runCompileTimeOptimization(cacheFilteredDag, dagDirectory);
optimizedDAG.storeJSON(dagDirectory, irDagId + optimizationPolicy.getClass().getSimpleName(),
- "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
+ "IR optimized for " + optimizationPolicy.getClass().getSimpleName());
} catch (final Exception e) {
throw new CompileTimeOptimizationException(e);
}
@@ -121,8 +121,8 @@ public final class NemoOptimizer implements Optimizer {
cacheIdToEdge.forEach((cacheId, edge) -> {
if (!cacheIdToParallelism.containsKey(cacheId)) {
cacheIdToParallelism.put(
- cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
- .orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
+ cacheId, optimizedDAG.getVertexById(edge.getDst().getId()).getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("No parallelism on an IR vertex.")));
}
});
@@ -142,21 +142,20 @@ public final class NemoOptimizer implements Optimizer {
* @param cacheIdToEdge the map from cache ID to edge to update.
* @return the cropped dag regarding to caching.
*/
- private IRDAG handleCaching(final IRDAG dag,
- final Map<UUID, IREdge> cacheIdToEdge) {
+ private IRDAG handleCaching(final IRDAG dag, final Map<UUID, IREdge> cacheIdToEdge) {
dag.topologicalDo(irVertex ->
- dag.getIncomingEdgesOf(irVertex).forEach(
- edge -> edge.getPropertyValue(CacheIDProperty.class).
- ifPresent(cacheId -> cacheIdToEdge.put(cacheId, edge))
- ));
+ dag.getIncomingEdgesOf(irVertex).forEach(
+ edge -> edge.getPropertyValue(CacheIDProperty.class).
+ ifPresent(cacheId -> cacheIdToEdge.put(cacheId, edge))
+ ));
if (cacheIdToEdge.isEmpty()) {
return dag;
} else {
final DAGBuilder<IRVertex, IREdge> filteredDagBuilder = new DAGBuilder<>();
final List<IRVertex> sinkVertices = dag.getVertices().stream()
- .filter(irVertex -> dag.getOutgoingEdgesOf(irVertex).isEmpty())
- .collect(Collectors.toList());
+ .filter(irVertex -> dag.getOutgoingEdgesOf(irVertex).isEmpty())
+ .collect(Collectors.toList());
sinkVertices.forEach(filteredDagBuilder::addVertex); // Sink vertex cannot be cached already.
sinkVertices.forEach(sinkVtx -> addNonCachedVerticesAndEdges(dag, sinkVtx, filteredDagBuilder));
@@ -176,41 +175,41 @@ public final class NemoOptimizer implements Optimizer {
final IRVertex irVertex,
final DAGBuilder<IRVertex, IREdge> builder) {
if (irVertex.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)
- && dag.getIncomingEdgesOf(irVertex).stream()
- .filter(irEdge -> irEdge.getPropertyValue(CacheIDProperty.class).isPresent())
- .anyMatch(irEdge -> cacheIdToParallelism
- .containsKey(irEdge.getPropertyValue(CacheIDProperty.class).get()))) {
+ && dag.getIncomingEdgesOf(irVertex).stream()
+ .filter(irEdge -> irEdge.getPropertyValue(CacheIDProperty.class).isPresent())
+ .anyMatch(irEdge -> cacheIdToParallelism
+ .containsKey(irEdge.getPropertyValue(CacheIDProperty.class).get()))) {
builder.removeVertex(irVertex); // Ignore ghost vertex which was cached once.
return;
}
dag.getIncomingEdgesOf(irVertex).stream()
- .forEach(edge -> {
- final Optional<UUID> cacheId = dag.getOutgoingEdgesOf(edge.getSrc()).stream()
+ .forEach(edge -> {
+ final Optional<UUID> cacheId = dag.getOutgoingEdgesOf(edge.getSrc()).stream()
.filter(edgeToFilter -> edgeToFilter.getPropertyValue(CacheIDProperty.class).isPresent())
.map(edgeToMap -> edgeToMap.getPropertyValue(CacheIDProperty.class).get())
.findFirst();
- if (cacheId.isPresent() && cacheIdToParallelism.get(cacheId.get()) != null) { // Cached already.
- // Replace the vertex emitting cached edge with a cached source vertex.
- final IRVertex cachedDataRelayVertex = new CachedSourceVertex(cacheIdToParallelism.get(cacheId.get()));
- cachedDataRelayVertex.setPropertyPermanently(ParallelismProperty.of(cacheIdToParallelism.get(cacheId.get())));
+ if (cacheId.isPresent() && cacheIdToParallelism.get(cacheId.get()) != null) { // Cached already.
+ // Replace the vertex emitting cached edge with a cached source vertex.
+ final IRVertex cachedDataRelayVertex = new CachedSourceVertex(cacheIdToParallelism.get(cacheId.get()));
+ cachedDataRelayVertex.setPropertyPermanently(ParallelismProperty.of(cacheIdToParallelism.get(cacheId.get())));
- builder.addVertex(cachedDataRelayVertex);
- final IREdge newEdge = new IREdge(
+ builder.addVertex(cachedDataRelayVertex);
+ final IREdge newEdge = new IREdge(
edge.getPropertyValue(CommunicationPatternProperty.class)
- .orElseThrow(() -> new RuntimeException("No communication pattern on an ir edge")),
+ .orElseThrow(() -> new RuntimeException("No communication pattern on an ir edge")),
cachedDataRelayVertex,
irVertex);
- edge.copyExecutionPropertiesTo(newEdge);
- newEdge.setProperty(CacheIDProperty.of(cacheId.get()));
- builder.connectVertices(newEdge);
- // Stop the recursion for this vertex.
- } else {
- final IRVertex srcVtx = edge.getSrc();
- builder.addVertex(srcVtx);
- builder.connectVertices(edge);
- addNonCachedVerticesAndEdges(dag, srcVtx, builder);
- }
- });
+ edge.copyExecutionPropertiesTo(newEdge);
+ newEdge.setProperty(CacheIDProperty.of(cacheId.get()));
+ builder.connectVertices(newEdge);
+ // Stop the recursion for this vertex.
+ } else {
+ final IRVertex srcVtx = edge.getSrc();
+ builder.addVertex(srcVtx);
+ builder.connectVertices(edge);
+ addNonCachedVerticesAndEdges(dag, srcVtx, builder);
+ }
+ });
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
index 212aab8..31c606b 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
@@ -41,11 +41,11 @@ public final class LargeShuffleDataPersistencePass extends AnnotatingPass {
@Override
public void optimize(final IRDAG dag) {
dag.topologicalDo(irVertex ->
- dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
- final DataFlowProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowProperty.class).get();
- if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
- irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
- }
- }));
+ dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+ final DataFlowProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowProperty.class).get();
+ if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
+ irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+ }
+ }));
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
index c47f509..e9ce8e2 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
@@ -24,14 +24,12 @@ import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProp
import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.system.StreamVertex;
import org.apache.nemo.common.ir.vertex.transform.RelayTransform;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
/**
- * Pass to modify the DAG for a job to batch the disk seek.
- * It adds a {@link OperatorVertex} with {@link RelayTransform} before the vertices
- * receiving shuffle edges,
- * to merge the shuffled data in memory and write to the disk at once.
+ * Inserts the StreamVertex for each shuffle edge.
*/
@Requires(CommunicationPatternProperty.class)
public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
@@ -43,6 +41,20 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
super(LargeShuffleRelayReshapingPass.class);
}
+
+ @Override
+ public void optimize(final IRDAG dag) {
+ dag.topologicalDo(vertex -> {
+ dag.getIncomingEdgesOf(vertex).forEach(edge -> {
+ if (CommunicationPatternProperty.Value.Shuffle
+ .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+ dag.insert(new StreamVertex(), edge);
+ }
+ });
+ });
+ }
+
+ /*
@Override
public void optimize(final IRDAG dag) {
dag.topologicalDo(v -> {
@@ -79,4 +91,5 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
});
return builder.build();
}
+ */
}