You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 02:17:00 UTC
[incubator-nemo] branch reshaping updated: refactor largeshuffle
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/reshaping by this push:
new f894b97 refactor largeshuffle
f894b97 is described below
commit f894b9726b58f52b33384825b418d9f37567e468
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 11:16:50 2019 +0900
refactor largeshuffle
---
.../annotating/LargeShuffleAnnotatingPass.java | 95 ++++++++++++++++++++++
.../annotating/LargeShuffleCompressionPass.java | 60 --------------
.../annotating/LargeShuffleDataFlowPass.java | 57 -------------
.../LargeShuffleDataPersistencePass.java | 51 ------------
.../annotating/LargeShuffleDataStorePass.java | 60 --------------
.../annotating/LargeShuffleDecoderPass.java | 57 -------------
.../annotating/LargeShuffleDecompressionPass.java | 61 --------------
.../annotating/LargeShuffleEncoderPass.java | 59 --------------
.../annotating/LargeShufflePartitionerPass.java | 61 --------------
.../annotating/LargeShuffleResourceSlotPass.java | 57 -------------
.../composite/LargeShuffleCompositePass.java | 14 +---
...ingPass.java => LargeShuffleReshapingPass.java} | 6 +-
12 files changed, 101 insertions(+), 537 deletions(-)
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
new file mode 100644
index 0000000..97f431e
--- /dev/null
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleAnnotatingPass.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import org.apache.nemo.common.coder.BytesDecoderFactory;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
+import org.apache.nemo.common.ir.vertex.system.StreamVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+
+/**
+ * This pass assumes that a StreamVertex was previously inserted to receive each shuffle edge.
+ *
+ * src -> shuffle-edge -> streamvertex -> one-to-one-edge -> dst
+ *
+ * (1) shuffle-edge
+ * Encode/compress into byte[], and have the receiver read data as the same byte[], rather than decompressing/decoding.
+ * Perform a push-based in-memory shuffle with discarding on.
+ *
+ * (2) streamvertex
+ * Ignore resource slots, such that all tasks fetch the in-memory input data blocks as soon as they become available.
+ *
+ * (3) one-to-one-edge
+ * Do not re-compress the byte[]
+ *
+ *
+ * PUll and on-disk data transfer
+ */
+@Annotates({CompressionProperty.class, DataFlowProperty.class, CompressionProperty.class,
+ DataPersistenceProperty.class, DataStoreProperty.class, DecoderProperty.class, DecompressionProperty.class,
+ EncoderProperty.class, PartitionerProperty.class, ResourceSlotProperty.class})
+@Requires(CommunicationPatternProperty.class)
+public final class LargeShuffleAnnotatingPass extends AnnotatingPass {
+ /**
+ * Default constructor.
+ */
+ public LargeShuffleAnnotatingPass() {
+ super(LargeShuffleAnnotatingPass.class);
+ }
+
+ @Override
+ public void optimize(final IRDAG dag) {
+ dag.topologicalDo(irVertex ->
+ dag.getIncomingEdgesOf(irVertex).forEach(edge -> {
+ if (edge.getDst().getClass().equals(StreamVertex.class)) {
+ // CASE #1: To a stream vertex
+
+ // Coder and Compression
+ edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
+ edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
+ edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
+
+ // Data transfers
+ edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push));
+ edge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+ edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
+
+ // Resource slots
+ edge.getDst().setPropertyPermanently(ResourceSlotProperty.of(false));
+ } else if (edge.getSrc().getClass().equals(StreamVertex.class)) {
+ // CASE #2: From a stream vertex
+
+ // Coder and Compression
+ edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None));
+ edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4));
+
+ // Data transfers
+ edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+ edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ edge.setPropertyPermanently(
+ PartitionerProperty.of(PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
+ } else {
+ // CASE #3: Unrelated to any stream vertices
+ edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+ }
+ }));
+ }
+}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleCompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleCompressionPass.java
deleted file mode 100644
index c2b7a26..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleCompressionPass.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to support Sailfish-like shuffle by tagging edges.
- * This pass modifies the encoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to write data as byte arrays.
- */
-@Annotates(CompressionProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleCompressionPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShuffleCompressionPass() {
- super(LargeShuffleCompressionPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
-
- dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay ->
- edgeFromRelay.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None)));
- }
- });
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
deleted file mode 100644
index e95c3e4..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass handles the DataFlowModel ExecutionProperty.
- */
-@Annotates(DataFlowProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDataFlowPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShuffleDataFlowPass() {
- super(LargeShuffleDataFlowPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push)); // Push to the merger vertex.
- } else {
- edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
- }
- });
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
deleted file mode 100644
index 31c606b..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass handles the data persistence ExecutionProperty.
- */
-@Annotates(DataPersistenceProperty.class)
-@Requires(DataFlowProperty.class)
-public final class LargeShuffleDataPersistencePass extends AnnotatingPass {
-
- /**
- * Default constructor.
- */
- public LargeShuffleDataPersistencePass() {
- super(LargeShuffleDataPersistencePass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.topologicalDo(irVertex ->
- dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
- final DataFlowProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowProperty.class).get();
- if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
- irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
- }
- }));
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
deleted file mode 100644
index 66c6278..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass handles the DataStore ExecutionProperty.
- */
-@Annotates(DataStoreProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDataStorePass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShuffleDataStorePass() {
- super(LargeShuffleDataStorePass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- // Find the merger vertex inserted by reshaping pass.
- if (dag.getIncomingEdgesOf(vertex).stream().anyMatch(irEdge ->
- CommunicationPatternProperty.Value.Shuffle
- .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
- dag.getIncomingEdgesOf(vertex).forEach(edgeToMerger -> {
- if (CommunicationPatternProperty.Value.Shuffle
- .equals(edgeToMerger.getPropertyValue(CommunicationPatternProperty.class).get())) {
- // Pass data through memory to the merger vertex.
- edgeToMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
- }
- });
- dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
- // Merge the input data and write it immediately to the remote disk.
- edgeFromMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
- }
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
deleted file mode 100644
index e2230ee..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.coder.BytesDecoderFactory;
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass modifies the decoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to read data as byte arrays.
- */
-@Annotates(DecoderProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDecoderPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShuffleDecoderPass() {
- super(LargeShuffleDecoderPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
- }
- });
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
deleted file mode 100644
index 7db6478..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to support Sailfish-like shuffle by tagging edges.
- * This pass modifies the decoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to read data as byte arrays.
- */
-@Annotates(DecompressionProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleDecompressionPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShuffleDecompressionPass() {
- super(LargeShuffleDecompressionPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
-
- dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay ->
- edgeFromRelay.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4)));
- }
- });
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
deleted file mode 100644
index 8e31107..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.coder.BytesEncoderFactory;
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to optimize large shuffle by tagging edges.
- * This pass modifies the encoder property toward {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to write data as byte arrays.
- */
-@Annotates(EncoderProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleEncoderPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShuffleEncoderPass() {
- super(LargeShuffleEncoderPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay ->
- edgeFromRelay.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of())));
- }
- });
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
deleted file mode 100644
index f1d308a..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-/**
- * A pass to support Sailfish-like shuffle by tagging edges.
- * This pass modifies the partitioner property from {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform}
- * to write an element as a partition.
- * This enables that every byte[] element, which was a partition for the reduce task, becomes one partition again
- * and flushed to disk write after it is relayed.
- */
-@Annotates(PartitionerProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class LargeShufflePartitionerPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public LargeShufflePartitionerPass() {
- super(LargeShufflePartitionerPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay ->
- edgeFromRelay.setPropertyPermanently(PartitionerProperty.of(
- PartitionerProperty.Value.DedicatedKeyPerElementPartitioner)));
- }
- });
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
deleted file mode 100644
index 2f642cd..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * Sets {@link ResourceSlotProperty}.
- */
-@Annotates(ResourceSlotProperty.class)
-@Requires(DataFlowProperty.class)
-public final class LargeShuffleResourceSlotPass extends AnnotatingPass {
-
- /**
- * Default constructor.
- */
- public LargeShuffleResourceSlotPass() {
- super(LargeShuffleResourceSlotPass.class);
- }
-
- @Override
- public void optimize(final IRDAG dag) {
- // On every vertex that receive push edge, if ResourceSlotProperty is not set, put it as false.
- // For other vertices, if ResourceSlotProperty is not set, put it as true.
- dag.getVertices().stream()
- .filter(v -> !v.getPropertyValue(ResourceSlotProperty.class).isPresent())
- .forEach(v -> {
- if (dag.getIncomingEdgesOf(v).stream().anyMatch(
- e -> e.getPropertyValue(DataFlowProperty.class)
- .orElseThrow(() -> new RuntimeException(String.format("DataFlowProperty for %s must be set",
- e.getId()))).equals(DataFlowProperty.Value.Push))) {
- v.setPropertyPermanently(ResourceSlotProperty.of(false));
- } else {
- v.setPropertyPermanently(ResourceSlotProperty.of(true));
- }
- });
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
index f69b2b8..2752c04 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.optimizer.pass.compiletime.composite;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.*;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.LargeShuffleRelayReshapingPass;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.LargeShuffleReshapingPass;
import java.util.Arrays;
@@ -33,16 +33,8 @@ public final class LargeShuffleCompositePass extends CompositePass {
*/
public LargeShuffleCompositePass() {
super(Arrays.asList(
- new LargeShuffleRelayReshapingPass(),
- new LargeShuffleDataFlowPass(),
- new LargeShuffleDataStorePass(),
- new LargeShuffleDecoderPass(),
- new LargeShuffleEncoderPass(),
- new LargeShufflePartitionerPass(),
- new LargeShuffleCompressionPass(),
- new LargeShuffleDecompressionPass(),
- new LargeShuffleDataPersistencePass(),
- new LargeShuffleResourceSlotPass()
+ new LargeShuffleReshapingPass(),
+ new LargeShuffleAnnotatingPass()
));
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
similarity index 90%
rename from compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
rename to compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
index 47aef26..6aa8245 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
@@ -27,13 +27,13 @@ import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
* Inserts the StreamVertex for each shuffle edge.
*/
@Requires(CommunicationPatternProperty.class)
-public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
+public final class LargeShuffleReshapingPass extends ReshapingPass {
/**
* Default constructor.
*/
- public LargeShuffleRelayReshapingPass() {
- super(LargeShuffleRelayReshapingPass.class);
+ public LargeShuffleReshapingPass() {
+ super(LargeShuffleReshapingPass.class);
}