You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/08 02:38:41 UTC
[incubator-nemo] branch master updated: [NEMO-172] Implement one
partition per one element partitioner (#85)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 9333cf3 [NEMO-172] Implement one partition per one element partitioner (#85)
9333cf3 is described below
commit 9333cf3c8ff838438ff469c5e229e51e8231b362
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Wed Aug 8 11:38:40 2018 +0900
[NEMO-172] Implement one partition per one element partitioner (#85)
JIRA: [NEMO-172: Implement one partition per one element partitioner](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-172)
**Major changes:**
- Implement `DedicatedKeyPerElementPartitioner` which assigns a dedicated key for each element
- Flush partitions which have dedicated key to disk in large shuffle optimization
- Remove redundant (de)compression before and after the vertex having `RelayTransform` in large shuffle optimization
**Minor changes to note:**
- `None` value for (de)compression is added
- `Block#commitPartitions` method is added to commit all un-committed partitions
**Tests for the changes:**
- `BlockTest` is implemented
- Existing IT cases running large shuffle optimization also cover this change
**Other comments:**
- N/A
resolves [NEMO-172](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-172)
---
.../executionproperty/CompressionProperty.java | 1 +
.../executionproperty/PartitionerProperty.java | 3 +-
.../annotating/LargeShuffleDecompressionPass.java | 59 +++++++++
.../annotating/LargeShufflePartitionerPass.java | 59 +++++++++
.../annotating/LargeSuffleCompressionPass.java | 58 +++++++++
.../composite/LargeShuffleCompositePass.java | 3 +
.../composite/LargeShuffleCompositePassTest.java | 10 ++
.../nemo/runtime/executor/data/block/Block.java | 16 ++-
.../runtime/executor/data/block/FileBlock.java | 28 +++--
.../data/block/NonSerializedMemoryBlock.java | 12 ++
.../executor/data/block/SerializedMemoryBlock.java | 22 +++-
.../data/partitioner/DedicatedKeyPerElement.java | 27 ++++
.../DedicatedKeyPerElementPartitioner.java | 40 ++++++
.../streamchainer/CompressionStreamChainer.java | 2 +
.../streamchainer/DecompressionStreamChainer.java | 2 +
.../executor/datatransfer/OutputWriter.java | 9 ++
.../snu/nemo/runtime/executor/data/BlockTest.java | 138 +++++++++++++++++++++
17 files changed, 475 insertions(+), 14 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
index 441949e..cb50ef3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
@@ -46,5 +46,6 @@ public final class CompressionProperty extends EdgeExecutionProperty<Compression
public enum Value {
Gzip,
LZ4,
+ None
}
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
index 249433e..04f5c07 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
@@ -44,6 +44,7 @@ public final class PartitionerProperty extends EdgeExecutionProperty<Partitioner
public enum Value {
DataSkewHashPartitioner,
HashPartitioner,
- IntactPartitioner
+ IntactPartitioner,
+ DedicatedKeyPerElementPartitioner
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
new file mode 100644
index 0000000..e1b31f2
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the decoder property toward {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to read data as byte arrays.
+ */
+public final class LargeShuffleDecompressionPass extends AnnotatingPass {
+ /**
+ * Default constructor.
+ */
+ public LargeShuffleDecompressionPass() {
+ super(DecompressionProperty.class, Collections.singleton(CommunicationPatternProperty.class));
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ dag.getVertices().forEach(vertex -> {
+ final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+ inEdges.forEach(edge -> {
+ if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.Shuffle)) {
+ edge.setProperty(DecompressionProperty.of(CompressionProperty.Value.None));
+
+ dag.getOutgoingEdgesOf(edge.getDst())
+ .forEach(edgeFromRelay -> {
+ edgeFromRelay.setProperty(DecompressionProperty.of(CompressionProperty.Value.LZ4));
+ });
+ }
+ });
+ });
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
new file mode 100644
index 0000000..eb3b2bf
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the partitioner property from {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to write an element as a partition.
+ * This enables that every byte[] element, which was a partition for the reduce task, becomes one partition again
+ * and flushed to disk write after it is relayed.
+ */
+public final class LargeShufflePartitionerPass extends AnnotatingPass {
+ /**
+ * Default constructor.
+ */
+ public LargeShufflePartitionerPass() {
+ super(PartitionerProperty.class, Collections.singleton(CommunicationPatternProperty.class));
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ dag.getVertices().forEach(vertex -> {
+ final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+ inEdges.forEach(edge -> {
+ if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.Shuffle)) {
+ dag.getOutgoingEdgesOf(edge.getDst())
+ .forEach(edgeFromRelay -> {
+ edgeFromRelay.setProperty(PartitionerProperty.of(
+ PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
+ });
+ }
+ });
+ });
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
new file mode 100644
index 0000000..abc9300
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the encoder property toward {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to write data as byte arrays.
+ */
+public final class LargeSuffleCompressionPass extends AnnotatingPass {
+ /**
+ * Default constructor.
+ */
+ public LargeSuffleCompressionPass() {
+ super(CompressionProperty.class, Collections.singleton(CommunicationPatternProperty.class));
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ dag.getVertices().forEach(vertex -> {
+ final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+ inEdges.forEach(edge -> {
+ if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.Shuffle)) {
+ edge.setProperty(CompressionProperty.of(CompressionProperty.Value.LZ4));
+
+ dag.getOutgoingEdgesOf(edge.getDst())
+ .forEach(edgeFromRelay -> {
+ edgeFromRelay.setProperty(CompressionProperty.of(CompressionProperty.Value.None));
+ });
+ }
+ });
+ });
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
index c6108e2..e69e56c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
@@ -35,6 +35,9 @@ public final class LargeShuffleCompositePass extends CompositePass {
new LargeShuffleDataStorePass(),
new LargeShuffleDecoderPass(),
new LargeShuffleEncoderPass(),
+ new LargeShufflePartitionerPass(),
+ new LargeSuffleCompressionPass(),
+ new LargeShuffleDecompressionPass(),
new LargeShuffleDataPersistencePass(),
new LargeShuffleResourceSlotPass()
));
diff --git a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
index d97816c..f859270 100644
--- a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
+++ b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
@@ -64,6 +64,10 @@ public class LargeShuffleCompositePassTest {
edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
assertEquals(BytesDecoderFactory.of(),
edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+ assertEquals(CompressionProperty.Value.LZ4,
+ edgeToMerger.getPropertyValue(CompressionProperty.class).get());
+ assertEquals(CompressionProperty.Value.None,
+ edgeToMerger.getPropertyValue(DecompressionProperty.class).get());
} else {
assertEquals(DataFlowProperty.Value.Pull,
edgeToMerger.getPropertyValue(DataFlowProperty.class).get());
@@ -78,6 +82,12 @@ public class LargeShuffleCompositePassTest {
edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
assertEquals(BytesEncoderFactory.of(),
edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+ assertEquals(PartitionerProperty.Value.DedicatedKeyPerElementPartitioner,
+ edgeFromMerger.getPropertyValue(PartitionerProperty.class).get());
+ assertEquals(CompressionProperty.Value.None,
+ edgeFromMerger.getPropertyValue(CompressionProperty.class).get());
+ assertEquals(CompressionProperty.Value.LZ4,
+ edgeFromMerger.getPropertyValue(DecompressionProperty.class).get());
});
} else {
// Non merger vertex.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index 10602b9..e0de210 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -103,7 +103,7 @@ public interface Block<K extends Serializable> {
* Commits this block to prevent further write.
*
* @return the size of each partition if the data in the block is serialized.
- * @throws BlockWriteException for any error occurred while trying to write a block.
+ * @throws BlockWriteException for any error occurred while trying to commit a block.
* (This exception will be thrown to the scheduler
* through {@link edu.snu.nemo.runtime.executor.Executor} and
* have to be handled by the scheduler with fault tolerance mechanism.)
@@ -111,6 +111,20 @@ public interface Block<K extends Serializable> {
Optional<Map<K, Long>> commit() throws BlockWriteException;
/**
+ * Commits all un-committed partitions.
+ * This method can be useful if partitions in a block should be committed before the block is committed totally.
+ * For example, non-committed partitions in a file block can be flushed to storage from memory.
+ * If another element is written after this method is called, a new non-committed partition should be created
+ * for the element even if a partition with the same key is committed already.
+ *
+ * @throws BlockWriteException for any error occurred while trying to commit partitions.
+ * (This exception will be thrown to the scheduler
+ * through {@link edu.snu.nemo.runtime.executor.Executor} and
+ * have to be handled by the scheduler with fault tolerance mechanism.)
+ */
+ void commitPartitions() throws BlockWriteException;
+
+ /**
* @return the ID of this block.
*/
String getId();
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 41f125b..b9a04e4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.runtime.common.data.KeyRange;
import edu.snu.nemo.runtime.executor.data.*;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.Partition;
import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
import edu.snu.nemo.runtime.executor.data.metadata.PartitionMetadata;
@@ -314,13 +315,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
try {
if (!metadata.isCommitted()) {
- final List<SerializedPartition<K>> partitions = new ArrayList<>();
- for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
- partition.commit();
- partitions.add(partition);
- }
- writeToFile(partitions);
- nonCommittedPartitionsMap.clear();
+ commitPartitions();
metadata.commitBlock();
}
final List<PartitionMetadata<K>> partitionMetadataList = metadata.getPartitionMetadataList();
@@ -342,6 +337,25 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
}
/**
+ * Commits all un-committed partitions.
+ * The committed partitions will be flushed to the storage.
+ */
+ @Override
+ public synchronized void commitPartitions() throws BlockWriteException {
+ final List<SerializedPartition<K>> partitions = new ArrayList<>();
+ try {
+ for (final Partition<?, K> partition : nonCommittedPartitionsMap.values()) {
+ partition.commit();
+ partitions.add((SerializedPartition<K>) partition);
+ }
+ writeToFile(partitions);
+ nonCommittedPartitionsMap.clear();
+ } catch (final IOException e) {
+ throw new BlockWriteException(e);
+ }
+ }
+
+ /**
* @return the ID of this block.
*/
@Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 6a8323a..5bf1e01 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -188,6 +188,18 @@ public final class NonSerializedMemoryBlock<K extends Serializable> implements B
}
/**
+ * Commits all un-committed partitions.
+ */
+ @Override
+ public synchronized void commitPartitions() throws BlockWriteException {
+ nonCommittedPartitionsMap.forEach((key, partition) -> {
+ partition.commit();
+ nonSerializedPartitions.add(partition);
+ });
+ nonCommittedPartitionsMap.clear();
+ }
+
+ /**
* @return the ID of this block.
*/
@Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 4282b69..847558f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -180,11 +180,7 @@ public final class SerializedMemoryBlock<K extends Serializable> implements Bloc
public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
try {
if (!committed) {
- for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
- partition.commit();
- serializedPartitions.add(partition);
- }
- nonCommittedPartitionsMap.clear();
+ commitPartitions();
committed = true;
}
final Map<K, Long> partitionSizes = new HashMap<>(serializedPartitions.size());
@@ -205,6 +201,22 @@ public final class SerializedMemoryBlock<K extends Serializable> implements Bloc
}
/**
+ * Commits all un-committed partitions.
+ */
+ @Override
+ public synchronized void commitPartitions() throws BlockWriteException {
+ try {
+ for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
+ partition.commit();
+ serializedPartitions.add(partition);
+ }
+ nonCommittedPartitionsMap.clear();
+ } catch (final IOException e) {
+ throw new BlockWriteException(e);
+ }
+ }
+
+ /**
* @return the ID of this block.
*/
@Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
new file mode 100644
index 0000000..02c9bed
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partitioner;
+
+import java.lang.annotation.*;
+
+/**
+ * Declares that all of the designated keys for each element in a {@link Partitioner} is dedicated for the element.
+ */
+@Target({ElementType.TYPE})
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DedicatedKeyPerElement {
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
new file mode 100644
index 0000000..f01f08b
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partitioner;
+
+/**
+ * An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
+ * WARNING: Because this partitioner assigns a dedicated key per element, it should be used under specific circumstances
+ * that the number of output element is not that many. For example, every output element of
+ * {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform} inserted by large shuffle optimization is always
+ * a partition. In this case, assigning a key for each element can be useful.
+ */
+@DedicatedKeyPerElement
+public final class DedicatedKeyPerElementPartitioner implements Partitioner<Integer> {
+ private int key;
+
+ /**
+ * Constructor.
+ */
+ public DedicatedKeyPerElementPartitioner() {
+ key = 0;
+ }
+
+ @Override
+ public Integer partition(final Object element) {
+ return key++;
+ }
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
index 467ba18..00f4ca2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
@@ -45,6 +45,8 @@ public class CompressionStreamChainer implements EncodeStreamChainer {
return new GZIPOutputStream(out);
case LZ4:
return new LZ4BlockOutputStream(out);
+ case None:
+ return out;
default:
throw new UnsupportedCompressionException("Not supported compression method");
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
index 558bd35..b36546c 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
@@ -45,6 +45,8 @@ public class DecompressionStreamChainer implements DecodeStreamChainer {
return new GZIPInputStream(in);
case LZ4:
return new LZ4BlockInputStream(in);
+ case None:
+ return in;
default:
throw new UnsupportedCompressionException("Not supported compression method");
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 930a4b4..6e4164a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -82,6 +82,9 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
case DataSkewHashPartitioner:
this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.get());
break;
+ case DedicatedKeyPerElementPartitioner:
+ this.partitioner = new DedicatedKeyPerElementPartitioner();
+ break;
default:
throw new UnsupportedPartitionerException(
new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
@@ -103,6 +106,12 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
public void write(final Object element) {
if (nonDummyBlock) {
blockToWrite.write(partitioner.partition(element), element);
+
+ final DedicatedKeyPerElement dedicatedKeyPerElement =
+ partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class);
+ if (dedicatedKeyPerElement != null) {
+ blockToWrite.commitPartitions();
+ }
} // If else, does not need to write because the data is duplicated.
}
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
new file mode 100644
index 0000000..c05745d
--- /dev/null
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data;
+
+import edu.snu.nemo.common.coder.IntDecoderFactory;
+import edu.snu.nemo.common.coder.IntEncoderFactory;
+import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.runtime.executor.data.block.Block;
+import edu.snu.nemo.runtime.executor.data.block.FileBlock;
+import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock;
+import edu.snu.nemo.runtime.executor.data.block.SerializedMemoryBlock;
+import edu.snu.nemo.runtime.executor.data.metadata.LocalFileMetadata;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * Tests write and read for {@link Block}s.
+ */
+public final class BlockTest {
+ private Serializer serializer;
+ private Map<Integer, List<Integer>> testData;
+
+ /**
+ * Generates the test data and serializer.
+ */
+ @Before
+ public void setUp() throws Exception {
+ serializer = new Serializer<>(IntEncoderFactory.of(), IntDecoderFactory.of(), new ArrayList<>(), new ArrayList<>());
+ testData = new HashMap<>();
+
+ final List<Integer> list1 = Collections.singletonList(1);
+ final List<Integer> list2 = Arrays.asList(1, 2);
+ final List<Integer> list3 = Arrays.asList(1, 2, 3);
+
+ testData.put(1, list1);
+ testData.put(2, list2);
+ testData.put(3, list3);
+ }
+
+ /**
+ * Test {@link NonSerializedMemoryBlock}.
+ */
+ @Test(timeout = 10000)
+ public void testNonSerializedMemoryBlock() throws Exception {
+ final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", serializer);
+ testBlock(block);
+ }
+
+ /**
+ * Test {@link edu.snu.nemo.runtime.executor.data.block.SerializedMemoryBlock}.
+ */
+ @Test(timeout = 10000)
+ public void testSerializedMemoryBlock() throws Exception {
+ final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", serializer);
+ testBlock(block);
+ }
+
+ /**
+ * Test {@link FileBlock}.
+ */
+ @Test(timeout = 10000)
+ public void testFileBlock() throws Exception {
+ final String tmpDir = "./tmpFiles";
+ final String filePath = tmpDir + "/BlockTestFile";
+ try {
+ new File(tmpDir).mkdirs();
+ final LocalFileMetadata<Integer> metadata = new LocalFileMetadata<>();
+ final Block<Integer> block = new FileBlock<>("testBlock", serializer, filePath, metadata);
+ testBlock(block);
+ } finally {
+ FileUtils.deleteDirectory(new File(tmpDir));
+ }
+ }
+
+
+ /**
+ * Tests write to & read from a block.
+ */
+ private void testBlock(final Block<Integer> block) throws Exception {
+ // Write elements to partitions in the block
+ testData.forEach((key, partitionData) -> partitionData.forEach(element -> block.write(key, element)));
+
+ // Commit all partitions
+ block.commitPartitions();
+
+ // Write elements again. Because all partitions are committed, new partitions for each key will be created.
+ testData.forEach((key, partitionData) -> partitionData.forEach(element -> block.write(key, element)));
+
+ // Commit the block
+ block.commit();
+
+ int count = 0;
+ final Iterable<NonSerializedPartition<Integer>> partitions = block.readPartitions(HashRange.all());
+ for (final NonSerializedPartition<Integer> readPartition : partitions) {
+ count++;
+ final List<Integer> expectedData = testData.get(readPartition.getKey());
+ final Iterable<Integer> readData = readPartition.getData();
+ compare(expectedData, readData);
+ }
+ Assert.assertEquals(count, testData.size() * 2);
+ }
+
+ /**
+ * Compare the contents of a list and an iterable.
+ * @param list the list to test.
+ * @param iterable the iterable to test.
+ * @throws RuntimeException if the contents are not matched.
+ */
+ private void compare(final List<Integer> list,
+ final Iterable<Integer> iterable) throws RuntimeException {
+ final List<Integer> copiedList = new ArrayList<>(list);
+ for (final Integer element : iterable) {
+ if (!copiedList.remove(element)) {
+ throw new RuntimeException("Contents mismatch! \nlist: " + list + "\niterable: " + iterable);
+ }
+ }
+ }
+}