You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 05:22:12 UTC

[incubator-nemo] branch reshaping updated: return an irdag

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new 290be9f  return an irdag
290be9f is described below

commit 290be9fa977104746037021a694d1be2472c8191
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 14:22:01 2019 +0900

    return an irdag
---
 .../optimizer/pass/compiletime/CompileTimePass.java   |  2 +-
 .../annotating/AggressiveSpeculativeCloningPass.java  |  3 ++-
 .../pass/compiletime/annotating/CompressionPass.java  |  3 ++-
 .../compiletime/annotating/DecompressionPass.java     |  3 ++-
 .../annotating/DefaultDataPersistencePass.java        |  3 ++-
 .../compiletime/annotating/DefaultDataStorePass.java  |  3 ++-
 .../annotating/DefaultEdgeDecoderPass.java            |  3 ++-
 .../annotating/DefaultEdgeEncoderPass.java            |  3 ++-
 .../compiletime/annotating/DefaultMetricPass.java     |  3 ++-
 .../annotating/DefaultParallelismPass.java            |  3 ++-
 .../annotating/DefaultScheduleGroupPass.java          |  3 ++-
 .../annotating/DisaggregationEdgeDataStorePass.java   |  3 ++-
 .../annotating/DuplicateEdgeGroupSizePass.java        |  3 ++-
 .../annotating/LargeShuffleAnnotatingPass.java        |  3 ++-
 .../annotating/PipeTransferForAllEdgesPass.java       |  3 ++-
 .../compiletime/annotating/ResourceLocalityPass.java  |  3 ++-
 .../pass/compiletime/annotating/ResourceSitePass.java |  3 ++-
 .../pass/compiletime/annotating/ResourceSlotPass.java |  3 ++-
 .../compiletime/annotating/ShuffleEdgePushPass.java   |  3 ++-
 .../compiletime/annotating/SkewPartitionerPass.java   |  3 ++-
 .../annotating/SkewResourceSkewedDataPass.java        |  3 ++-
 .../annotating/TransientResourceDataFlowPass.java     |  3 ++-
 .../annotating/TransientResourceDataStorePass.java    |  3 ++-
 .../annotating/TransientResourcePriorityPass.java     |  3 ++-
 .../compiletime/annotating/UpfrontCloningPass.java    |  3 ++-
 .../pass/compiletime/composite/CompositePass.java     | 19 +++++++++++++++++--
 .../reshaping/CommonSubexpressionEliminationPass.java |  4 +++-
 .../reshaping/LargeShuffleReshapingPass.java          |  3 ++-
 .../compiletime/reshaping/LoopExtractionPass.java     |  3 ++-
 .../pass/compiletime/reshaping/LoopOptimizations.java |  7 +++++--
 .../pass/compiletime/reshaping/LoopUnrollingPass.java |  3 ++-
 .../pass/compiletime/reshaping/SkewReshapingPass.java |  3 ++-
 32 files changed, 82 insertions(+), 34 deletions(-)

diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
index 2577d95..d12c0b2 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
@@ -36,5 +36,5 @@ public abstract class CompileTimePass extends Pass {
   public abstract Set<Class<? extends ExecutionProperty>> getPrerequisiteExecutionProperties();
 
 
-  public abstract void optimize(final IRDAG dag);
+  public abstract IRDAG optimize(final IRDAG dag);
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
index b32ebc3..d4395a7 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/AggressiveSpeculativeCloningPass.java
@@ -35,7 +35,7 @@ public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     // Speculative execution policy.
     final double fractionToWaitFor = 0.00000001; // Aggressive
     final double medianTimeMultiplier = 1.00000001; // Aggressive
@@ -43,5 +43,6 @@ public final class AggressiveSpeculativeCloningPass extends AnnotatingPass {
     // Apply the policy to ALL vertices
     dag.getVertices().forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(
       new ClonedSchedulingProperty.CloneConf(fractionToWaitFor, medianTimeMultiplier))));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 6f6c7ab..36edbf0 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -46,9 +46,10 @@ public final class CompressionPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
         .filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
         .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
index dac14a7..a78fe01 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -40,12 +40,13 @@ public final class DecompressionPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
         // Find edges which have a compression property but not decompression property.
         .filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
             && !edge.getPropertyValue(DecompressionProperty.class).isPresent())
         .forEach(edge -> edge.setProperty(DecompressionProperty.of(
             edge.getPropertyValue(CompressionProperty.class).get()))));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java
index ffdc12a..0bd3c8c 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataPersistencePass.java
@@ -38,7 +38,7 @@ public final class DefaultDataPersistencePass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           if (!irEdge.getPropertyValue(DataPersistenceProperty.class).isPresent()) {
@@ -52,5 +52,6 @@ public final class DefaultDataPersistencePass extends AnnotatingPass {
             }
           }
         }));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
index 518b511..82ca540 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
@@ -34,12 +34,13 @@ public final class DefaultDataStorePass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().forEach(vertex -> {
       dag.getIncomingEdgesOf(vertex).stream()
           .filter(edge -> !edge.getPropertyValue(DataStoreProperty.class).isPresent())
           .forEach(edge -> edge.setProperty(
               DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
     });
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
index 34fc175..3f886d4 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
@@ -39,12 +39,13 @@ public final class DefaultEdgeDecoderPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           if (!irEdge.getPropertyValue(DecoderProperty.class).isPresent()) {
             irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
           }
         }));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
index a9273c7..6f2fe3a 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
@@ -39,12 +39,13 @@ public final class DefaultEdgeEncoderPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           if (!irEdge.getPropertyValue(EncoderProperty.class).isPresent()) {
             irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
           }
         }));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
index 507691d..af281cf 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
@@ -42,7 +42,7 @@ public final class DefaultMetricPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(dst ->
       dag.getIncomingEdgesOf(dst).forEach(edge -> {
         if (CommunicationPatternProperty.Value.Shuffle
@@ -55,5 +55,6 @@ public final class DefaultMetricPass extends AnnotatingPass {
           edge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
         }
       }));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
index bcc1bab..6f6ff91 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
@@ -59,7 +59,7 @@ public final class DefaultParallelismPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     // Propagate forward source parallelism
     dag.topologicalDo(vertex -> {
       try {
@@ -102,6 +102,7 @@ public final class DefaultParallelismPass extends AnnotatingPass {
         throw new RuntimeException(e);
       }
     });
+    return dag;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 95e892e..6054c64 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -83,7 +83,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
 
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     final Map<IRVertex, ScheduleGroup> irVertexToScheduleGroupMap = new HashMap<>();
     final Set<ScheduleGroup> scheduleGroups = new HashSet<>();
     dag.topologicalDo(irVertex -> {
@@ -222,6 +222,7 @@ public final class DefaultScheduleGroupPass extends AnnotatingPass {
         currentScheduleGroup.increment();
       }
     });
+    return dag;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index 744d232..0bd8532 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -40,11 +40,12 @@ public final class DisaggregationEdgeDataStorePass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with GlusterFileStore.
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       inEdges.forEach(edge ->
         edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore)));
     });
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
index 4559a6a..56dc72e 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
@@ -39,7 +39,7 @@ public final class DuplicateEdgeGroupSizePass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     final HashMap<String, Integer> groupIdToGroupSize = new HashMap<>();
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex)
         .forEach(e -> {
@@ -63,5 +63,6 @@ public final class DuplicateEdgeGroupSizePass extends AnnotatingPass {
             }
           }
         }));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
index 41490bf..9bfe277 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
@@ -55,7 +55,7 @@ public final class LargeShuffleAnnotatingPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(irVertex ->
       dag.getIncomingEdgesOf(irVertex).forEach(edge -> {
         if (edge.getDst().getClass().equals(StreamVertex.class)) {
@@ -91,5 +91,6 @@ public final class LargeShuffleAnnotatingPass extends AnnotatingPass {
           edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
         }
       }));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
index 83daa5e..55dc4b7 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
@@ -34,11 +34,12 @@ public final class PipeTransferForAllEdgesPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().forEach(vertex -> {
       dag.getIncomingEdgesOf(vertex).stream()
           .forEach(edge -> edge.setPropertyPermanently(
               DataStoreProperty.of(DataStoreProperty.Value.Pipe)));
     });
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
index cf7838a..5815c09 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
@@ -35,10 +35,11 @@ public final class ResourceLocalityPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     // On every vertex, if ResourceLocalityProperty is not set, put it as true.
     dag.getVertices().stream()
         .filter(v -> !v.getPropertyValue(ResourceLocalityProperty.class).isPresent())
         .forEach(v -> v.setProperty(ResourceLocalityProperty.of(true)));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
index 8410570..7ed522e 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
@@ -74,12 +74,13 @@ public final class ResourceSitePass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     if (bandwidthSpecificationString.isEmpty()) {
       dag.topologicalDo(irVertex -> irVertex.setProperty(ResourceSiteProperty.of(EMPTY_MAP)));
     } else {
       assignNodeShares(dag, BandwidthSpecification.fromJsonString(bandwidthSpecificationString));
     }
+    return dag;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
index 78d3086..a21be03 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
@@ -35,10 +35,11 @@ public final class ResourceSlotPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     // On every vertex, if ResourceSlotProperty is not set, put it as true.
     dag.getVertices().stream()
         .filter(v -> !v.getPropertyValue(ResourceSlotProperty.class).isPresent())
         .forEach(v -> v.setProperty(ResourceSlotProperty.of(true)));
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
index 80256e1..0e4da08 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
@@ -41,7 +41,7 @@ public final class ShuffleEdgePushPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().forEach(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
@@ -53,5 +53,6 @@ public final class ShuffleEdgePushPass extends AnnotatingPass {
         });
       }
     });
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index 31a4c78..3621aff 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -37,7 +37,7 @@ public final class SkewPartitionerPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices()
       .forEach(v -> dag.getOutgoingEdgesOf(v).stream()
         .filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
@@ -45,5 +45,6 @@ public final class SkewPartitionerPass extends AnnotatingPass {
           .setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner))
         )
       );
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index 8f0babf..5d57327 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -42,7 +42,7 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices()
       .forEach(v -> dag.getOutgoingEdgesOf(v).stream()
         .filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
@@ -54,5 +54,6 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
           });
         })
       );
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
index 4d943ae..66230b3 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
@@ -42,7 +42,7 @@ public final class TransientResourceDataFlowPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().forEach(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
@@ -55,5 +55,6 @@ public final class TransientResourceDataFlowPass extends AnnotatingPass {
         });
       }
     });
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
index 234881a..c527758 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
@@ -41,7 +41,7 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().forEach(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
@@ -57,6 +57,7 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
         });
       }
     });
+    return dag;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
index 862e763..4aae650 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
@@ -40,7 +40,7 @@ public final class TransientResourcePriorityPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (inEdges.isEmpty()) {
@@ -53,6 +53,7 @@ public final class TransientResourcePriorityPass extends AnnotatingPass {
         }
       }
     });
+    return dag;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index 718cc3a..c3df132 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -37,7 +37,7 @@ public final class UpfrontCloningPass extends AnnotatingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.getVertices().stream()
         .filter(vertex -> dag.getIncomingEdgesOf(vertex.getId())
           .stream()
@@ -50,5 +50,6 @@ public final class UpfrontCloningPass extends AnnotatingPass {
           )
         .forEach(vertex -> vertex.setProperty(
           ClonedSchedulingProperty.of(new ClonedSchedulingProperty.CloneConf()))); // clone upfront, always
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
index 6f5341a..576c892 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
@@ -24,6 +24,7 @@ import org.apache.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
 
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -58,8 +59,22 @@ public abstract class CompositePass extends CompileTimePass {
   }
 
   @Override
-  public final void optimize(final IRDAG irVertexIREdgeDAG) {
-    getPassList().forEach(pass -> pass.optimize(irVertexIREdgeDAG));
+  public final IRDAG optimize(final IRDAG irVertexIREdgeDAG) {
+    return recursivelyApply(irVertexIREdgeDAG, getPassList().iterator());
+  }
+
+  /**
+   * Recursively apply the give list of passes.
+   * @param dag dag.
+   * @param passIterator pass iterator.
+   * @return dag.
+   */
+  private IRDAG recursivelyApply(final IRDAG dag, final Iterator<CompileTimePass> passIterator) {
+    if (passIterator.hasNext()) {
+      return recursivelyApply(passIterator.next().optimize(dag), passIterator);
+    } else {
+      return dag;
+    }
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 5f49638..6e29ba3 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -51,7 +51,7 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
   }
 
   @Override
-  public void optimize(final IRDAG inputDAG) {
+  public IRDAG optimize(final IRDAG inputDAG) {
     // find and collect vertices with equivalent transforms
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
     final Map<Transform, List<OperatorVertex>> operatorVerticesToBeMerged = new HashMap<>();
@@ -129,6 +129,8 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
 
       return builder.build();
     });
+
+    return inputDAG;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
index 6aa8245..f3a3cb7 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
@@ -38,7 +38,7 @@ public final class LargeShuffleReshapingPass extends ReshapingPass {
 
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(vertex -> {
       dag.getIncomingEdgesOf(vertex).forEach(edge -> {
         if (CommunicationPatternProperty.Value.Shuffle
@@ -47,5 +47,6 @@ public final class LargeShuffleReshapingPass extends ReshapingPass {
         }
       });
     });
+    return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index 637036a..1bdf5aa 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -50,11 +50,12 @@ public final class LoopExtractionPass extends ReshapingPass {
   }
 
   @Override
-  public void optimize(final IRDAG inputDAG) {
+  public IRDAG optimize(final IRDAG inputDAG) {
     inputDAG.unSafeDirectReshaping(dag -> {
       final Integer maxStackDepth = this.findMaxLoopVertexStackDepth(dag);
       return groupLoops(dag, maxStackDepth);
     });
+    return inputDAG;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index 27afa4a..fa94290 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -113,7 +113,7 @@ public final class LoopOptimizations {
     }
 
     @Override
-    public void optimize(final IRDAG inputDAG) {
+    public IRDAG optimize(final IRDAG inputDAG) {
       inputDAG.unSafeDirectReshaping(dag -> {
         final List<LoopVertex> loopVertices = new ArrayList<>();
         final Map<LoopVertex, List<IREdge>> inEdges = new HashMap<>();
@@ -202,6 +202,8 @@ public final class LoopOptimizations {
 
         return builder.build();
       });
+
+      return inputDAG;
     }
 
     /**
@@ -243,10 +245,11 @@ public final class LoopOptimizations {
     }
 
     @Override
-    public void optimize(final IRDAG inputDAG) {
+    public IRDAG optimize(final IRDAG inputDAG) {
       inputDAG.unSafeDirectReshaping(dag -> {
         return recursivelyOptimize(dag);
       });
+      return inputDAG;
     }
 
     DAG<IRVertex, IREdge> recursivelyOptimize(final DAG<IRVertex, IREdge> dag) {
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java
index 7d2219d..43f4885 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopUnrollingPass.java
@@ -39,10 +39,11 @@ public final class LoopUnrollingPass extends ReshapingPass {
   }
 
   @Override
-  public void optimize(final IRDAG inputDAG) {
+  public IRDAG optimize(final IRDAG inputDAG) {
     inputDAG.unSafeDirectReshaping(dag -> {
       return recursivelyUnroll(dag);
     });
+    return inputDAG;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index 680b725..55edfeb 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -52,7 +52,7 @@ public final class SkewReshapingPass extends ReshapingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public IRDAG optimize(final IRDAG dag) {
     dag.topologicalDo(v -> {
       // We care about OperatorVertices that have shuffle incoming edges with main output.
       // TODO #210: Data-aware dynamic optimization at run-time
@@ -98,5 +98,6 @@ public final class SkewReshapingPass extends ReshapingPass {
         }
       }
     });
+    return dag;
   }
 }