You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 05:22:12 UTC
[incubator-nemo] branch reshaping updated: return an irdag
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/reshaping by this push:
new 290be9f return an irdag
290be9f is described below
commit 290be9fa977104746037021a694d1be2472c8191
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 14:22:01 2019 +0900
return an irdag
---
.../optimizer/pass/compiletime/CompileTimePass.java | 2 +-
.../annotating/AggressiveSpeculativeCloningPass.java | 3 ++-
.../pass/compiletime/annotating/CompressionPass.java | 3 ++-
.../compiletime/annotating/DecompressionPass.java | 3 ++-
.../annotating/DefaultDataPersistencePass.java | 3 ++-
.../compiletime/annotating/DefaultDataStorePass.java | 3 ++-
.../annotating/DefaultEdgeDecoderPass.java | 3 ++-
.../annotating/DefaultEdgeEncoderPass.java | 3 ++-
.../compiletime/annotating/DefaultMetricPass.java | 3 ++-
.../annotating/DefaultParallelismPass.java | 3 ++-
.../annotating/DefaultScheduleGroupPass.java | 3 ++-
.../annotating/DisaggregationEdgeDataStorePass.java | 3 ++-
.../annotating/DuplicateEdgeGroupSizePass.java | 3 ++-
.../annotating/LargeShuffleAnnotatingPass.java | 3 ++-
.../annotating/PipeTransferForAllEdgesPass.java | 3 ++-
.../compiletime/annotating/ResourceLocalityPass.java | 3 ++-
.../pass/compiletime/annotating/ResourceSitePass.java | 3 ++-
.../pass/compiletime/annotating/ResourceSlotPass.java | 3 ++-
.../compiletime/annotating/ShuffleEdgePushPass.java | 3 ++-
.../compiletime/annotating/SkewPartitionerPass.java | 3 ++-
.../annotating/SkewResourceSkewedDataPass.java | 3 ++-
.../annotating/TransientResourceDataFlowPass.java | 3 ++-
.../annotating/TransientResourceDataStorePass.java | 3 ++-
.../annotating/TransientResourcePriorityPass.java | 3 ++-
.../compiletime/annotating/UpfrontCloningPass.java | 3 ++-
.../pass/compiletime/composite/CompositePass.java | 19 +++++++++++++++++--
.../reshaping/CommonSubexpressionEliminationPass.java | 4 +++-
.../reshaping/LargeShuffleReshapingPass.java | 3 ++-
.../compiletime/reshaping/LoopExtractionPass.java | 3 ++-
.../pass/compiletime/reshaping/LoopOptimizations.java | 7 +++++--
.../pass/compiletime/reshaping/LoopUnrollingPass.java | 3 ++-
.../pass/compiletime/reshaping/SkewReshapingPass.java | 3 ++-
32 files changed, 82 insertions(+), 34 deletions(-)
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
index 2577d95..d12c0b2 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
@@ -36,5 +36,5 @@ public abstract class CompileTimePass extends Pass {
public abstract Set<Class<? extends ExecutionProperty>> getPrerequisiteExecutionProperties();
- public abstract void optimize(final IRDAG dag);
+ public abstract IRDAG optimize(final IRDAG dag);
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
index b32ebc3..d4395a7 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
@@ -35,7 +35,7 @@ public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
// Speculative execution policy.
final double fractionToWaitFor = 0.00000001; // Aggressive
final double medianTimeMultiplier = 1.00000001; // Aggressive
@@ -43,5 +43,6 @@ public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
// Apply the policy to ALL vertices
dag.getVertices().forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(
new ClonedSchedulingProperty.CloneConf(fractionToWaitFor, medianTimeMultiplier))));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 6f6c7ab..36edbf0 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -46,9 +46,10 @@ public final class CompressionPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
.filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
.forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
index dac14a7..a78fe01 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -40,12 +40,13 @@ public final class DecompressionPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
// Find edges which have a compression property but not decompression property.
.filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
&& !edge.getPropertyValue(DecompressionProperty.class).isPresent())
.forEach(edge -> edge.setProperty(DecompressionProperty.of(
edge.getPropertyValue(CompressionProperty.class).get()))));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java
index ffdc12a..0bd3c8c 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java
@@ -38,7 +38,7 @@ public final class DefaultDataPersistencePass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(irVertex ->
dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
if (!irEdge.getPropertyValue(DataPersistenceProperty.class).isPresent()) {
@@ -52,5 +52,6 @@ public final class DefaultDataPersistencePass extends AnnotatingPass {
}
}
}));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
index 518b511..82ca540 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
@@ -34,12 +34,13 @@ public final class DefaultDataStorePass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().forEach(vertex -> {
dag.getIncomingEdgesOf(vertex).stream()
.filter(edge -> !edge.getPropertyValue(DataStoreProperty.class).isPresent())
.forEach(edge -> edge.setProperty(
DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
});
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
index 34fc175..3f886d4 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
@@ -39,12 +39,13 @@ public final class DefaultEdgeDecoderPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(irVertex ->
dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
if (!irEdge.getPropertyValue(DecoderProperty.class).isPresent()) {
irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
}
}));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
index a9273c7..6f2fe3a 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
@@ -39,12 +39,13 @@ public final class DefaultEdgeEncoderPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(irVertex ->
dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
if (!irEdge.getPropertyValue(EncoderProperty.class).isPresent()) {
irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
}
}));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
index 507691d..af281cf 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
@@ -42,7 +42,7 @@ public final class DefaultMetricPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(dst ->
dag.getIncomingEdgesOf(dst).forEach(edge -> {
if (CommunicationPatternProperty.Value.Shuffle
@@ -55,5 +55,6 @@ public final class DefaultMetricPass extends AnnotatingPass {
edge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
}
}));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
index bcc1bab..6f6ff91 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
@@ -59,7 +59,7 @@ public final class DefaultParallelismPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
// Propagate forward source parallelism
dag.topologicalDo(vertex -> {
try {
@@ -102,6 +102,7 @@ public final class DefaultParallelismPass extends AnnotatingPass {
throw new RuntimeException(e);
}
});
+ return dag;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 95e892e..6054c64 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -83,7 +83,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
final Map<IRVertex, ScheduleGroup> irVertexToScheduleGroupMap = new HashMap<>();
final Set<ScheduleGroup> scheduleGroups = new HashSet<>();
dag.topologicalDo(irVertex -> {
@@ -222,6 +222,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
currentScheduleGroup.increment();
}
});
+ return dag;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index 744d232..0bd8532 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -40,11 +40,12 @@ public final class DisaggregationEdgeDataStorePass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with GlusterFileStore.
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
inEdges.forEach(edge ->
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore)));
});
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
index 4559a6a..56dc72e 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
@@ -39,7 +39,7 @@ public final class DuplicateEdgeGroupSizePass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
final HashMap<String, Integer> groupIdToGroupSize = new HashMap<>();
dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex)
.forEach(e -> {
@@ -63,5 +63,6 @@ public final class DuplicateEdgeGroupSizePass extends AnnotatingPass {
}
}
}));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
index 41490bf..9bfe277 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
@@ -55,7 +55,7 @@ public final class LargeShuffleAnnotatingPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(irVertex ->
dag.getIncomingEdgesOf(irVertex).forEach(edge -> {
if (edge.getDst().getClass().equals(StreamVertex.class)) {
@@ -91,5 +91,6 @@ public final class LargeShuffleAnnotatingPass extends AnnotatingPass {
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
}
}));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
index 83daa5e..55dc4b7 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
@@ -34,11 +34,12 @@ public final class PipeTransferForAllEdgesPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().forEach(vertex -> {
dag.getIncomingEdgesOf(vertex).stream()
.forEach(edge -> edge.setPropertyPermanently(
DataStoreProperty.of(DataStoreProperty.Value.Pipe)));
});
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
index cf7838a..5815c09 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
@@ -35,10 +35,11 @@ public final class ResourceLocalityPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
// On every vertex, if ResourceLocalityProperty is not set, put it as true.
dag.getVertices().stream()
.filter(v -> !v.getPropertyValue(ResourceLocalityProperty.class).isPresent())
.forEach(v -> v.setProperty(ResourceLocalityProperty.of(true)));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
index 8410570..7ed522e 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
@@ -74,12 +74,13 @@ public final class ResourceSitePass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
if (bandwidthSpecificationString.isEmpty()) {
dag.topologicalDo(irVertex -> irVertex.setProperty(ResourceSiteProperty.of(EMPTY_MAP)));
} else {
assignNodeShares(dag, BandwidthSpecification.fromJsonString(bandwidthSpecificationString));
}
+ return dag;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
index 78d3086..a21be03 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
@@ -35,10 +35,11 @@ public final class ResourceSlotPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
// On every vertex, if ResourceSlotProperty is not set, put it as true.
dag.getVertices().stream()
.filter(v -> !v.getPropertyValue(ResourceSlotProperty.class).isPresent())
.forEach(v -> v.setProperty(ResourceSlotProperty.of(true)));
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
index 80256e1..0e4da08 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
@@ -41,7 +41,7 @@ public final class ShuffleEdgePushPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().forEach(vertex -> {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (!inEdges.isEmpty()) {
@@ -53,5 +53,6 @@ public final class ShuffleEdgePushPass extends AnnotatingPass {
});
}
});
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index 31a4c78..3621aff 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -37,7 +37,7 @@ public final class SkewPartitionerPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices()
.forEach(v -> dag.getOutgoingEdgesOf(v).stream()
.filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
@@ -45,5 +45,6 @@ public final class SkewPartitionerPass extends AnnotatingPass {
.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner))
)
);
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index 8f0babf..5d57327 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -42,7 +42,7 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices()
.forEach(v -> dag.getOutgoingEdgesOf(v).stream()
.filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
@@ -54,5 +54,6 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
});
})
);
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
index 4d943ae..66230b3 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
@@ -42,7 +42,7 @@ public final class TransientResourceDataFlowPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().forEach(vertex -> {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (!inEdges.isEmpty()) {
@@ -55,5 +55,6 @@ public final class TransientResourceDataFlowPass extends AnnotatingPass {
});
}
});
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
index 234881a..c527758 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
@@ -41,7 +41,7 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().forEach(vertex -> {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (!inEdges.isEmpty()) {
@@ -57,6 +57,7 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
});
}
});
+ return dag;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
index 862e763..4aae650 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
@@ -40,7 +40,7 @@ public final class TransientResourcePriorityPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(vertex -> {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (inEdges.isEmpty()) {
@@ -53,6 +53,7 @@ public final class TransientResourcePriorityPass extends AnnotatingPass {
}
}
});
+ return dag;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index 718cc3a..c3df132 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -37,7 +37,7 @@ public final class UpfrontCloningPass extends AnnotatingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.getVertices().stream()
.filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
.stream()
@@ -50,5 +50,6 @@ public final class UpfrontCloningPass extends AnnotatingPass {
)
.forEach(vertex -> vertex.setProperty(
ClonedSchedulingProperty.of(new ClonedSchedulingProperty.CloneConf()))); // clone upfront, always
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
index 6f5341a..576c892 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
@@ -24,6 +24,7 @@ import org.apache.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -58,8 +59,22 @@ public abstract class CompositePass extends CompileTimePass {
}
@Override
- public final void optimize(final IRDAG irVertexIREdgeDAG) {
- getPassList().forEach(pass -> pass.optimize(irVertexIREdgeDAG));
+ public final IRDAG optimize(final IRDAG irVertexIREdgeDAG) {
+ return recursivelyApply(irVertexIREdgeDAG, getPassList().iterator());
+ }
+
+ /**
+ * Recursively apply the give list of passes.
+ * @param dag dag.
+ * @param passIterator pass iterator.
+ * @return dag.
+ */
+ private IRDAG recursivelyApply(final IRDAG dag, final Iterator<CompileTimePass> passIterator) {
+ if (passIterator.hasNext()) {
+ return recursivelyApply(passIterator.next().optimize(dag), passIterator);
+ } else {
+ return dag;
+ }
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 5f49638..6e29ba3 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -51,7 +51,7 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
}
@Override
- public void optimize(final IRDAG inputDAG) {
+ public IRDAG optimize(final IRDAG inputDAG) {
// find and collect vertices with equivalent transforms
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
final Map<Transform, List<OperatorVertex>> operatorVerticesToBeMerged = new HashMap<>();
@@ -129,6 +129,8 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
return builder.build();
});
+
+ return inputDAG;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
index 6aa8245..f3a3cb7 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
@@ -38,7 +38,7 @@ public final class LargeShuffleReshapingPass extends ReshapingPass {
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(vertex -> {
dag.getIncomingEdgesOf(vertex).forEach(edge -> {
if (CommunicationPatternProperty.Value.Shuffle
@@ -47,5 +47,6 @@ public final class LargeShuffleReshapingPass extends ReshapingPass {
}
});
});
+ return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index 637036a..1bdf5aa 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -50,11 +50,12 @@ public final class LoopExtractionPass extends ReshapingPass {
}
@Override
- public void optimize(final IRDAG inputDAG) {
+ public IRDAG optimize(final IRDAG inputDAG) {
inputDAG.unSafeDirectReshaping(dag -> {
final Integer maxStackDepth = this.findMaxLoopVertexStackDepth(dag);
return groupLoops(dag, maxStackDepth);
});
+ return inputDAG;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index 27afa4a..fa94290 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -113,7 +113,7 @@ public final class LoopOptimizations {
}
@Override
- public void optimize(final IRDAG inputDAG) {
+ public IRDAG optimize(final IRDAG inputDAG) {
inputDAG.unSafeDirectReshaping(dag -> {
final List<LoopVertex> loopVertices = new ArrayList<>();
final Map<LoopVertex, List<IREdge>> inEdges = new HashMap<>();
@@ -202,6 +202,8 @@ public final class LoopOptimizations {
return builder.build();
});
+
+ return inputDAG;
}
/**
@@ -243,10 +245,11 @@ public final class LoopOptimizations {
}
@Override
- public void optimize(final IRDAG inputDAG) {
+ public IRDAG optimize(final IRDAG inputDAG) {
inputDAG.unSafeDirectReshaping(dag -> {
return recursivelyOptimize(dag);
});
+ return inputDAG;
}
DAG<IRVertex, IREdge> recursivelyOptimize(final DAG<IRVertex, IREdge> dag) {
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java
index 7d2219d..43f4885 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java
@@ -39,10 +39,11 @@ public final class LoopUnrollingPass extends ReshapingPass {
}
@Override
- public void optimize(final IRDAG inputDAG) {
+ public IRDAG optimize(final IRDAG inputDAG) {
inputDAG.unSafeDirectReshaping(dag -> {
return recursivelyUnroll(dag);
});
+ return inputDAG;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index 680b725..55edfeb 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -52,7 +52,7 @@ public final class SkewReshapingPass extends ReshapingPass {
}
@Override
- public void optimize(final IRDAG dag) {
+ public IRDAG optimize(final IRDAG dag) {
dag.topologicalDo(v -> {
// We care about OperatorVertices that have shuffle incoming edges with main output.
// TODO #210: Data-aware dynamic optimization at run-time
@@ -98,5 +98,6 @@ public final class SkewReshapingPass extends ReshapingPass {
}
}
});
+ return dag;
}
}