You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/08 02:38:41 UTC

[incubator-nemo] branch master updated: [NEMO-172] Implement one partition per one element partitioner (#85)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9333cf3  [NEMO-172] Implement one partition per one element partitioner (#85)
9333cf3 is described below

commit 9333cf3c8ff838438ff469c5e229e51e8231b362
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Wed Aug 8 11:38:40 2018 +0900

    [NEMO-172] Implement one partition per one element partitioner (#85)
    
    JIRA: [NEMO-172: Implement one partition per one element partitioner](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-172)
    
    **Major changes:**
    - Implement `DedicatedKeyPerElementPartitioner` which assigns a dedicated key for each element
    - Flush partitions which have dedicated key to disk in large shuffle optimization
    - Remove redundant (de)compression before and after the vertex having `RelayTransform` in large shuffle optimization
    
    **Minor changes to note:**
    - `None` value for (de)compression is added
    - `Block#commitPartitions` method is added to commit all un-committed partitions
    
    **Tests for the changes:**
    - `BlockTest` is implemented
    - Existing IT cases running large shuffle optimization also cover this change
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-172](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-172)
---
 .../executionproperty/CompressionProperty.java     |   1 +
 .../executionproperty/PartitionerProperty.java     |   3 +-
 .../annotating/LargeShuffleDecompressionPass.java  |  59 +++++++++
 .../annotating/LargeShufflePartitionerPass.java    |  59 +++++++++
 .../annotating/LargeSuffleCompressionPass.java     |  58 +++++++++
 .../composite/LargeShuffleCompositePass.java       |   3 +
 .../composite/LargeShuffleCompositePassTest.java   |  10 ++
 .../nemo/runtime/executor/data/block/Block.java    |  16 ++-
 .../runtime/executor/data/block/FileBlock.java     |  28 +++--
 .../data/block/NonSerializedMemoryBlock.java       |  12 ++
 .../executor/data/block/SerializedMemoryBlock.java |  22 +++-
 .../data/partitioner/DedicatedKeyPerElement.java   |  27 ++++
 .../DedicatedKeyPerElementPartitioner.java         |  40 ++++++
 .../streamchainer/CompressionStreamChainer.java    |   2 +
 .../streamchainer/DecompressionStreamChainer.java  |   2 +
 .../executor/datatransfer/OutputWriter.java        |   9 ++
 .../snu/nemo/runtime/executor/data/BlockTest.java  | 138 +++++++++++++++++++++
 17 files changed, 475 insertions(+), 14 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
index 441949e..cb50ef3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
@@ -46,5 +46,6 @@ public final class CompressionProperty extends EdgeExecutionProperty<Compression
   public enum Value {
     Gzip,
     LZ4,
+    None
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
index 249433e..04f5c07 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
@@ -44,6 +44,7 @@ public final class PartitionerProperty extends EdgeExecutionProperty<Partitioner
   public enum Value {
     DataSkewHashPartitioner,
     HashPartitioner,
-    IntactPartitioner
+    IntactPartitioner,
+    DedicatedKeyPerElementPartitioner
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
new file mode 100644
index 0000000..e1b31f2
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the decoder property toward {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to read data as byte arrays.
+ */
+public final class LargeShuffleDecompressionPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeShuffleDecompressionPass() {
+    super(DecompressionProperty.class, Collections.singleton(CommunicationPatternProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+      inEdges.forEach(edge -> {
+        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+            .equals(CommunicationPatternProperty.Value.Shuffle)) {
+          edge.setProperty(DecompressionProperty.of(CompressionProperty.Value.None));
+
+          dag.getOutgoingEdgesOf(edge.getDst())
+              .forEach(edgeFromRelay -> {
+                edgeFromRelay.setProperty(DecompressionProperty.of(CompressionProperty.Value.LZ4));
+              });
+        }
+      });
+    });
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
new file mode 100644
index 0000000..eb3b2bf
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the partitioner property from {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to write an element as a partition.
+ * This enables that every byte[] element, which was a partition for the reduce task, becomes one partition again
+ * and flushed to disk write after it is relayed.
+ */
+public final class LargeShufflePartitionerPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeShufflePartitionerPass() {
+    super(PartitionerProperty.class, Collections.singleton(CommunicationPatternProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+      inEdges.forEach(edge -> {
+        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+            .equals(CommunicationPatternProperty.Value.Shuffle)) {
+          dag.getOutgoingEdgesOf(edge.getDst())
+              .forEach(edgeFromRelay -> {
+                edgeFromRelay.setProperty(PartitionerProperty.of(
+                    PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
+              });
+        }
+      });
+    });
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
new file mode 100644
index 0000000..abc9300
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the encoder property toward {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to write data as byte arrays.
+ */
+public final class LargeSuffleCompressionPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeSuffleCompressionPass() {
+    super(CompressionProperty.class, Collections.singleton(CommunicationPatternProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+      inEdges.forEach(edge -> {
+        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+            .equals(CommunicationPatternProperty.Value.Shuffle)) {
+          edge.setProperty(CompressionProperty.of(CompressionProperty.Value.LZ4));
+
+          dag.getOutgoingEdgesOf(edge.getDst())
+              .forEach(edgeFromRelay -> {
+                edgeFromRelay.setProperty(CompressionProperty.of(CompressionProperty.Value.None));
+              });
+        }
+      });
+    });
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
index c6108e2..e69e56c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
@@ -35,6 +35,9 @@ public final class LargeShuffleCompositePass extends CompositePass {
         new LargeShuffleDataStorePass(),
         new LargeShuffleDecoderPass(),
         new LargeShuffleEncoderPass(),
+        new LargeShufflePartitionerPass(),
+        new LargeSuffleCompressionPass(),
+        new LargeShuffleDecompressionPass(),
         new LargeShuffleDataPersistencePass(),
         new LargeShuffleResourceSlotPass()
     ));
diff --git a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
index d97816c..f859270 100644
--- a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
+++ b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
@@ -64,6 +64,10 @@ public class LargeShuffleCompositePassTest {
                 edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
             assertEquals(BytesDecoderFactory.of(),
                 edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+            assertEquals(CompressionProperty.Value.LZ4,
+                edgeToMerger.getPropertyValue(CompressionProperty.class).get());
+            assertEquals(CompressionProperty.Value.None,
+                edgeToMerger.getPropertyValue(DecompressionProperty.class).get());
           } else {
             assertEquals(DataFlowProperty.Value.Pull,
                 edgeToMerger.getPropertyValue(DataFlowProperty.class).get());
@@ -78,6 +82,12 @@ public class LargeShuffleCompositePassTest {
               edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
           assertEquals(BytesEncoderFactory.of(),
               edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+          assertEquals(PartitionerProperty.Value.DedicatedKeyPerElementPartitioner,
+              edgeFromMerger.getPropertyValue(PartitionerProperty.class).get());
+          assertEquals(CompressionProperty.Value.None,
+              edgeFromMerger.getPropertyValue(CompressionProperty.class).get());
+          assertEquals(CompressionProperty.Value.LZ4,
+              edgeFromMerger.getPropertyValue(DecompressionProperty.class).get());
         });
       } else {
         // Non merger vertex.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index 10602b9..e0de210 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -103,7 +103,7 @@ public interface Block<K extends Serializable> {
    * Commits this block to prevent further write.
    *
    * @return the size of each partition if the data in the block is serialized.
-   * @throws BlockWriteException for any error occurred while trying to write a block.
+   * @throws BlockWriteException for any error occurred while trying to commit a block.
    *                             (This exception will be thrown to the scheduler
    *                             through {@link edu.snu.nemo.runtime.executor.Executor} and
    *                             have to be handled by the scheduler with fault tolerance mechanism.)
@@ -111,6 +111,20 @@ public interface Block<K extends Serializable> {
   Optional<Map<K, Long>> commit() throws BlockWriteException;
 
   /**
+   * Commits all un-committed partitions.
+   * This method can be useful if partitions in a block should be committed before the block is committed totally.
+   * For example, non-committed partitions in a file block can be flushed to storage from memory.
+   * If another element is written after this method is called, a new non-committed partition should be created
+   * for the element even if a partition with the same key is committed already.
+   *
+   * @throws BlockWriteException for any error occurred while trying to commit partitions.
+   *                             (This exception will be thrown to the scheduler
+   *                             through {@link edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   */
+  void commitPartitions() throws BlockWriteException;
+
+  /**
    * @return the ID of this block.
    */
   String getId();
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 41f125b..b9a04e4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.Partition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.metadata.PartitionMetadata;
@@ -314,13 +315,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
   public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
     try {
       if (!metadata.isCommitted()) {
-        final List<SerializedPartition<K>> partitions = new ArrayList<>();
-        for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
-          partition.commit();
-          partitions.add(partition);
-        }
-        writeToFile(partitions);
-        nonCommittedPartitionsMap.clear();
+        commitPartitions();
         metadata.commitBlock();
       }
       final List<PartitionMetadata<K>> partitionMetadataList = metadata.getPartitionMetadataList();
@@ -342,6 +337,25 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
   }
 
   /**
+   * Commits all un-committed partitions.
+   * The committed partitions will be flushed to the storage.
+   */
+  @Override
+  public synchronized void commitPartitions() throws BlockWriteException {
+    final List<SerializedPartition<K>> partitions = new ArrayList<>();
+    try {
+      for (final Partition<?, K> partition : nonCommittedPartitionsMap.values()) {
+        partition.commit();
+        partitions.add((SerializedPartition<K>) partition);
+      }
+      writeToFile(partitions);
+      nonCommittedPartitionsMap.clear();
+    } catch (final IOException e) {
+      throw new BlockWriteException(e);
+    }
+  }
+
+  /**
    * @return the ID of this block.
    */
   @Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 6a8323a..5bf1e01 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -188,6 +188,18 @@ public final class NonSerializedMemoryBlock<K extends Serializable> implements B
   }
 
   /**
+   * Commits all un-committed partitions.
+   */
+  @Override
+  public synchronized void commitPartitions() throws BlockWriteException {
+    nonCommittedPartitionsMap.forEach((key, partition) -> {
+      partition.commit();
+      nonSerializedPartitions.add(partition);
+    });
+    nonCommittedPartitionsMap.clear();
+  }
+
+  /**
    * @return the ID of this block.
    */
   @Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 4282b69..847558f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -180,11 +180,7 @@ public final class SerializedMemoryBlock<K extends Serializable> implements Bloc
   public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
     try {
       if (!committed) {
-        for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
-          partition.commit();
-          serializedPartitions.add(partition);
-        }
-        nonCommittedPartitionsMap.clear();
+        commitPartitions();
         committed = true;
       }
       final Map<K, Long> partitionSizes = new HashMap<>(serializedPartitions.size());
@@ -205,6 +201,22 @@ public final class SerializedMemoryBlock<K extends Serializable> implements Bloc
   }
 
   /**
+   * Commits all un-committed partitions.
+   */
+  @Override
+  public synchronized void commitPartitions() throws BlockWriteException {
+    try {
+      for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
+        partition.commit();
+        serializedPartitions.add(partition);
+      }
+      nonCommittedPartitionsMap.clear();
+    } catch (final IOException e) {
+      throw new BlockWriteException(e);
+    }
+  }
+
+  /**
    * @return the ID of this block.
    */
   @Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
new file mode 100644
index 0000000..02c9bed
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partitioner;
+
+import java.lang.annotation.*;
+
+/**
+ * Declares that all of the designated keys for each element in a {@link Partitioner} is dedicated for the element.
+ */
+@Target({ElementType.TYPE})
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DedicatedKeyPerElement {
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
new file mode 100644
index 0000000..f01f08b
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partitioner;
+
+/**
+ * An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
+ * WARNING: Because this partitioner assigns a dedicated key per element, it should be used under specific circumstances
+ * that the number of output element is not that many. For example, every output element of
+ * {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform} inserted by large shuffle optimization is always
+ * a partition. In this case, assigning a key for each element can be useful.
+ */
+@DedicatedKeyPerElement
+public final class DedicatedKeyPerElementPartitioner implements Partitioner<Integer> {
+  private int key;
+
+  /**
+   * Constructor.
+   */
+  public DedicatedKeyPerElementPartitioner() {
+    key = 0;
+  }
+
+  @Override
+  public Integer partition(final Object element) {
+    return key++;
+  }
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
index 467ba18..00f4ca2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
@@ -45,6 +45,8 @@ public class CompressionStreamChainer implements EncodeStreamChainer {
         return new GZIPOutputStream(out);
       case LZ4:
         return new LZ4BlockOutputStream(out);
+      case None:
+        return out;
       default:
         throw new UnsupportedCompressionException("Not supported compression method");
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
index 558bd35..b36546c 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
@@ -45,6 +45,8 @@ public class DecompressionStreamChainer implements DecodeStreamChainer {
         return new GZIPInputStream(in);
       case LZ4:
         return new LZ4BlockInputStream(in);
+      case None:
+        return in;
       default:
         throw new UnsupportedCompressionException("Not supported compression method");
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 930a4b4..6e4164a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -82,6 +82,9 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
       case DataSkewHashPartitioner:
         this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.get());
         break;
+      case DedicatedKeyPerElementPartitioner:
+        this.partitioner = new DedicatedKeyPerElementPartitioner();
+        break;
       default:
         throw new UnsupportedPartitionerException(
             new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
@@ -103,6 +106,12 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
   public void write(final Object element) {
     if (nonDummyBlock) {
       blockToWrite.write(partitioner.partition(element), element);
+
+      final DedicatedKeyPerElement dedicatedKeyPerElement =
+          partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class);
+      if (dedicatedKeyPerElement != null) {
+        blockToWrite.commitPartitions();
+      }
     } // If else, does not need to write because the data is duplicated.
   }
 
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
new file mode 100644
index 0000000..c05745d
--- /dev/null
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data;
+
+import edu.snu.nemo.common.coder.IntDecoderFactory;
+import edu.snu.nemo.common.coder.IntEncoderFactory;
+import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.runtime.executor.data.block.Block;
+import edu.snu.nemo.runtime.executor.data.block.FileBlock;
+import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock;
+import edu.snu.nemo.runtime.executor.data.block.SerializedMemoryBlock;
+import edu.snu.nemo.runtime.executor.data.metadata.LocalFileMetadata;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * Tests write and read for {@link Block}s.
+ */
+public final class BlockTest {
+  private Serializer serializer;
+  private Map<Integer, List<Integer>> testData;
+
+  /**
+   * Generates the test data and serializer.
+   */
+  @Before
+  public void setUp() throws Exception {
+    serializer = new Serializer<>(IntEncoderFactory.of(), IntDecoderFactory.of(), new ArrayList<>(), new ArrayList<>());
+    testData = new HashMap<>();
+
+    final List<Integer> list1 = Collections.singletonList(1);
+    final List<Integer> list2 = Arrays.asList(1, 2);
+    final List<Integer> list3 = Arrays.asList(1, 2, 3);
+
+    testData.put(1, list1);
+    testData.put(2, list2);
+    testData.put(3, list3);
+  }
+
+  /**
+   * Test {@link NonSerializedMemoryBlock}.
+   */
+  @Test(timeout = 10000)
+  public void testNonSerializedMemoryBlock() throws Exception {
+    final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", serializer);
+    testBlock(block);
+  }
+
+  /**
+   * Test {@link edu.snu.nemo.runtime.executor.data.block.SerializedMemoryBlock}.
+   */
+  @Test(timeout = 10000)
+  public void testSerializedMemoryBlock() throws Exception {
+    final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", serializer);
+    testBlock(block);
+  }
+
+  /**
+   * Test {@link FileBlock}.
+   */
+  @Test(timeout = 10000)
+  public void testFileBlock() throws Exception {
+    final String tmpDir = "./tmpFiles";
+    final String filePath = tmpDir + "/BlockTestFile";
+    try {
+      new File(tmpDir).mkdirs();
+      final LocalFileMetadata<Integer> metadata = new LocalFileMetadata<>();
+      final Block<Integer> block = new FileBlock<>("testBlock", serializer, filePath, metadata);
+      testBlock(block);
+    } finally {
+      FileUtils.deleteDirectory(new File(tmpDir));
+    }
+  }
+
+
+  /**
+   * Tests write to & read from a block.
+   */
+  private void testBlock(final Block<Integer> block) throws Exception {
+    // Write elements to partitions in the block
+    testData.forEach((key, partitionData) -> partitionData.forEach(element -> block.write(key, element)));
+
+    // Commit all partitions
+    block.commitPartitions();
+
+    // Write elements again. Because all partitions are committed, new partitions for each key will be created.
+    testData.forEach((key, partitionData) -> partitionData.forEach(element -> block.write(key, element)));
+
+    // Commit the block
+    block.commit();
+
+    int count = 0;
+    final Iterable<NonSerializedPartition<Integer>> partitions = block.readPartitions(HashRange.all());
+    for (final NonSerializedPartition<Integer> readPartition : partitions) {
+      count++;
+      final List<Integer> expectedData = testData.get(readPartition.getKey());
+      final Iterable<Integer> readData = readPartition.getData();
+      compare(expectedData, readData);
+    }
+    Assert.assertEquals(count, testData.size() * 2);
+  }
+
+  /**
+   * Compare the contents of a list and an iterable.
+   * @param list     the list to test.
+   * @param iterable the iterable to test.
+   * @throws RuntimeException if the contents are not matched.
+   */
+  private void compare(final List<Integer> list,
+                       final Iterable<Integer> iterable) throws RuntimeException {
+    final List<Integer> copiedList = new ArrayList<>(list);
+    for (final Integer element : iterable) {
+      if (!copiedList.remove(element)) {
+        throw new RuntimeException("Contents mismatch! \nlist: " + list + "\niterable: " + iterable);
+      }
+    }
+  }
+}