You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 02:17:00 UTC

[incubator-nemo] branch reshaping updated: refactor largeshuffle

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new f894b97  refactor largeshuffle
f894b97 is described below

commit f894b9726b58f52b33384825b418d9f37567e468
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 11:16:50 2019 +0900

    refactor largeshuffle
---
 .../annotating/LargeShuffleAnnotatingPass.java     | 95 ++++++++++++++++++++++
 .../annotating/LargeShuffleCompressionPass.java    | 60 --------------
 .../annotating/LargeShuffleDataFlowPass.java       | 57 -------------
 .../LargeShuffleDataPersistencePass.java           | 51 ------------
 .../annotating/LargeShuffleDataStorePass.java      | 60 --------------
 .../annotating/LargeShuffleDecoderPass.java        | 57 -------------
 .../annotating/LargeShuffleDecompressionPass.java  | 61 --------------
 .../annotating/LargeShuffleEncoderPass.java        | 59 --------------
 .../annotating/LargeShufflePartitionerPass.java    | 61 --------------
 .../annotating/LargeShuffleResourceSlotPass.java   | 57 -------------
 .../composite/LargeShuffleCompositePass.java       | 14 +---
 ...ingPass.java => LargeShuffleReshapingPass.java} |  6 +-
 12 files changed, 101 insertions(+), 537 deletions(-)

diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
new file mode 100644
index 0000000..97f431e
--- /dev/null
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import org.apache.nemo.common.coder.BytesDecoderFactory;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
+import org.apache.nemo.common.ir.vertex.system.StreamVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+
+/**
+ * This pass assumes that a StreamVertex was previously inserted to receive each shuffle edge.
+ *
+ * src -> shuffle-edge -> streamvertex -> one-to-one-edge -> dst
+ *
+ * (1) shuffle-edge
+ * Encode/compress into byte[], and have the receiver read data as the same byte[], rather than decompressing/decoding.
+ * Perform a push-based in-memory shuffle with discarding on.
+ *
+ * (2) streamvertex
+ * Ignore resource slots, such that all tasks fetch the in-memory input data blocks as soon as they become available.
+ *
+ * (3) one-to-one-edge
+ * Do not re-compress the byte[]
+ *
+ *
+ * PUll and on-disk data transfer
+ */
+@Annotates({CompressionProperty.class, DataFlowProperty.class, CompressionProperty.class,
+  DataPersistenceProperty.class, DataStoreProperty.class, DecoderProperty.class, DecompressionProperty.class,
+  EncoderProperty.class, PartitionerProperty.class, ResourceSlotProperty.class})
+@Requires(CommunicationPatternProperty.class)
+public final class LargeShuffleAnnotatingPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeShuffleAnnotatingPass() {
+    super(LargeShuffleAnnotatingPass.class);
+  }
+
+  @Override
+  public void optimize(final IRDAG dag) {
+    dag.topologicalDo(irVertex ->
+      dag.getIncomingEdgesOf(irVertex).forEach(edge -> {
+        if (edge.getDst().getClass().equals(StreamVertex.class)) {
+          // CASE #1: To a stream vertex
+
+          // Coder and Compression
+          edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
+          edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
+          edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
+
+          // Data transfers
+          edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push));
+          edge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+          edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
+
+          // Resource slots
+          edge.getDst().setPropertyPermanently(ResourceSlotProperty.of(false));
+        } else if (edge.getSrc().getClass().equals(StreamVertex.class)) {
+          // CASE #2: From a stream vertex
+
+          // Coder and Compression
+          edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None));
+          edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4));
+
+          // Data transfers
+          edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+          edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+          edge.setPropertyPermanently(
+            PartitionerProperty.of(PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
+        } else {
+          // CASE #3: Unrelated to any stream vertices
+          edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+        }
+      }));
+  }
+}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleCompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleCompressionPass.java
deleted file mode 100644
index c2b7a26..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleCompressionPass.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to support Sailfish-like shuffle by tagging edges.
- * This pass modifies the encoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to write data as byte arrays.
- */
-@Annotates(CompressionProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleCompressionPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleCompressionPass() {
-    super(LargeShuffleCompressionPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-            .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
-
-          dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay ->
-                  edgeFromRelay.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None)));
-        }
-      });
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
deleted file mode 100644
index e95c3e4..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass handles the DataFlowModel ExecutionProperty.
- */
-@Annotates(DataFlowProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDataFlowPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleDataFlowPass() {
-    super(LargeShuffleDataFlowPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-            .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push)); // Push to the merger vertex.
-        } else {
-          edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
-        }
-      });
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
deleted file mode 100644
index 31c606b..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass handles the data persistence ExecutionProperty.
- */
-@Annotates(DataPersistenceProperty.class)
-@Requires(DataFlowProperty.class)
-public final class LargeShuffleDataPersistencePass extends AnnotatingPass {
-
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleDataPersistencePass() {
-    super(LargeShuffleDataPersistencePass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.topologicalDo(irVertex ->
-      dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-        final DataFlowProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowProperty.class).get();
-        if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
-          irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
-        }
-      }));
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
deleted file mode 100644
index 66c6278..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass handles the DataStore ExecutionProperty.
- */
-@Annotates(DataStoreProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDataStorePass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleDataStorePass() {
-    super(LargeShuffleDataStorePass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      // Find the merger vertex inserted by reshaping pass.
-      if (dag.getIncomingEdgesOf(vertex).stream().anyMatch(irEdge ->
-              CommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
-        dag.getIncomingEdgesOf(vertex).forEach(edgeToMerger -> {
-          if (CommunicationPatternProperty.Value.Shuffle
-          .equals(edgeToMerger.getPropertyValue(CommunicationPatternProperty.class).get())) {
-            // Pass data through memory to the merger vertex.
-            edgeToMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
-          }
-        });
-        dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
-            // Merge the input data and write it immediately to the remote disk.
-            edgeFromMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
-      }
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
deleted file mode 100644
index e2230ee..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.coder.BytesDecoderFactory;
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass modifies the decoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to read data as byte arrays.
- */
-@Annotates(DecoderProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDecoderPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleDecoderPass() {
-    super(LargeShuffleDecoderPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-            .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
-        }
-      });
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
deleted file mode 100644
index 7db6478..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to support Sailfish-like shuffle by tagging edges.
- * This pass modifies the decoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to read data as byte arrays.
- */
-@Annotates(DecompressionProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDecompressionPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleDecompressionPass() {
-    super(LargeShuffleDecompressionPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-            .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
-
-          dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay ->
-                  edgeFromRelay.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4)));
-        }
-      });
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
deleted file mode 100644
index 8e31107..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.coder.BytesEncoderFactory;
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass modifies the encoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to write data as byte arrays.
- */
-@Annotates(EncoderProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleEncoderPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleEncoderPass() {
-    super(LargeShuffleEncoderPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-            .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay ->
-                  edgeFromRelay.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of())));
-        }
-      });
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
deleted file mode 100644
index f1d308a..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to support Sailfish-like shuffle by tagging edges.
- * This pass modifies the partitioner property from {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to write an element as a partition.
- * This enables that every byte[] element, which was a partition for the reduce task, becomes one partition again
- * and flushed to disk write after it is relayed.
- */
-@Annotates(PartitionerProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShufflePartitionerPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public LargeShufflePartitionerPass() {
-    super(LargeShufflePartitionerPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-            .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay ->
-                  edgeFromRelay.setPropertyPermanently(PartitionerProperty.of(
-                      PartitionerProperty.Value.DedicatedKeyPerElementPartitioner)));
-        }
-      });
-    });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
deleted file mode 100644
index 2f642cd..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * Sets {@link ResourceSlotProperty}.
- */
-@Annotates(ResourceSlotProperty.class)
-@Requires(DataFlowProperty.class)
-public final class LargeShuffleResourceSlotPass extends AnnotatingPass {
-
-  /**
-   * Default constructor.
-   */
-  public LargeShuffleResourceSlotPass() {
-    super(LargeShuffleResourceSlotPass.class);
-  }
-
-  @Override
-  public void optimize(final IRDAG dag) {
-    // On every vertex that receive push edge, if ResourceSlotProperty is not set, put it as false.
-    // For other vertices, if ResourceSlotProperty is not set, put it as true.
-    dag.getVertices().stream()
-        .filter(v -> !v.getPropertyValue(ResourceSlotProperty.class).isPresent())
-        .forEach(v -> {
-          if (dag.getIncomingEdgesOf(v).stream().anyMatch(
-              e -> e.getPropertyValue(DataFlowProperty.class)
-                  .orElseThrow(() -> new RuntimeException(String.format("DataFlowProperty for %s must be set",
-                      e.getId()))).equals(DataFlowProperty.Value.Push))) {
-            v.setPropertyPermanently(ResourceSlotProperty.of(false));
-          } else {
-            v.setPropertyPermanently(ResourceSlotProperty.of(true));
-          }
-        });
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
index f69b2b8..2752c04 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.compiler.optimizer.pass.compiletime.composite;
 
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.*;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.LargeShuffleRelayReshapingPass;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.LargeShuffleReshapingPass;
 
 import java.util.Arrays;
 
@@ -33,16 +33,8 @@ public final class LargeShuffleCompositePass extends CompositePass {
    */
   public LargeShuffleCompositePass() {
     super(Arrays.asList(
-        new LargeShuffleRelayReshapingPass(),
-        new LargeShuffleDataFlowPass(),
-        new LargeShuffleDataStorePass(),
-        new LargeShuffleDecoderPass(),
-        new LargeShuffleEncoderPass(),
-        new LargeShufflePartitionerPass(),
-        new LargeShuffleCompressionPass(),
-        new LargeShuffleDecompressionPass(),
-        new LargeShuffleDataPersistencePass(),
-        new LargeShuffleResourceSlotPass()
+        new LargeShuffleReshapingPass(),
+        new LargeShuffleAnnotatingPass()
     ));
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
similarity index 90%
rename from compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
rename to compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
index 47aef26..6aa8245 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
@@ -27,13 +27,13 @@ import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
  * Inserts the StreamVertex for each shuffle edge.
  */
 @Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
+public final class LargeShuffleReshapingPass extends ReshapingPass {
 
   /**
    * Default constructor.
    */
-  public LargeShuffleRelayReshapingPass() {
-    super(LargeShuffleRelayReshapingPass.class);
+  public LargeShuffleReshapingPass() {
+    super(LargeShuffleReshapingPass.class);
   }