You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/08 08:04:13 UTC
[incubator-nemo] branch master updated: [NEMO-176] Improve
sequential read from disk #92
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 435a274 [NEMO-176] Improve sequential read from disk #92
435a274 is described below
commit 435a2740a1760e091f5b98b40c69820965108fc0
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Wed Aug 8 17:04:11 2018 +0900
[NEMO-176] Improve sequential read from disk #92
JIRA: [NEMO-176: Improve sequential read from disk](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-176)
**Major changes:**
- Read whole data to read from file stream first and decode them later in `FileBlock`
**Minor changes to note:**
- Remove `limit` parameter in `InputStreamIterator` of `DataUtil` and make `DataUtil#deserializePartition` to limit the stream itself by the partition size.
- Remove the assumption that "the source and destination parallelism of One-to-One communication is always 1" in `OutputWriter` and `InputReader`
**Tests for the changes:**
- Existing `DataTransferTest`, `BlockStoreTest` and other unit tests cover this change.
- Existing integration tests also cover this change.
**Other comments:**
- N/A.
resolves [NEMO-176](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-176)
---
.../snu/nemo/runtime/executor/data/DataUtil.java | 43 ++++++--------------
.../runtime/executor/data/block/FileBlock.java | 31 ++++++---------
.../runtime/executor/datatransfer/InputReader.java | 9 +----
.../executor/datatransfer/OutputWriter.java | 46 ++++++++++++++--------
.../executor/datatransfer/DataTransferTest.java | 18 ++-------
5 files changed, 57 insertions(+), 90 deletions(-)
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 50c43d9..041283a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -80,11 +80,17 @@ public final class DataUtil {
final InputStream inputStream)
throws IOException {
final List deserializedData = new ArrayList();
- final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
- serializer, partitionSize);
- iterator.forEachRemaining(deserializedData::add);
- return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
- iterator.getNumEncodedBytes());
+ // We need to limit read bytes on this inputStream, which could be over-read by wrapped
+ // compression stream. This depends on the nature of the compression algorithm used.
+ // We recommend to wrap with LimitedInputStream once more when
+ // reading input from chained compression InputStream.
+ try (final LimitedInputStream limitedInputStream = new LimitedInputStream(inputStream, partitionSize)) {
+ final InputStreamIterator iterator =
+ new InputStreamIterator(Collections.singletonList(limitedInputStream).iterator(), serializer);
+ iterator.forEachRemaining(deserializedData::add);
+ return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
+ iterator.getNumEncodedBytes());
+ }
}
/**
@@ -197,7 +203,6 @@ public final class DataUtil {
private final Iterator<InputStream> inputStreams;
private final Serializer<?, T> serializer;
- private final long limit;
private volatile CountingInputStream serializedCountingStream = null;
private volatile CountingInputStream encodedCountingStream = null;
@@ -218,27 +223,6 @@ public final class DataUtil {
final Serializer<?, T> serializer) {
this.inputStreams = inputStreams;
this.serializer = serializer;
- // -1 means no limit.
- this.limit = -1;
- }
-
- /**
- * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
- *
- * @param inputStreams The streams to read data from.
- * @param serializer The serializer.
- * @param limit The bytes to read from the {@link InputStream}.
- */
- private InputStreamIterator(
- final Iterator<InputStream> inputStreams,
- final Serializer<?, T> serializer,
- final int limit) {
- if (limit < 0) {
- throw new IllegalArgumentException("Negative limit not allowed.");
- }
- this.inputStreams = inputStreams;
- this.serializer = serializer;
- this.limit = limit;
}
@Override
@@ -249,11 +233,6 @@ public final class DataUtil {
if (cannotContinueDecoding) {
return false;
}
- if (limit != -1 && limit == (serializedCountingStream == null
- ? numSerializedBytes : numSerializedBytes + serializedCountingStream.getCount())) {
- cannotContinueDecoding = true;
- return false;
- }
while (true) {
try {
if (decoder == null) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index b9a04e4..6eef824 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.runtime.executor.data.block;
+import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -172,36 +173,28 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
// Deserialize the data
final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
try {
+ final List<Pair<K, byte[]>> partitionKeyBytesPairs = new ArrayList<>();
try (final FileInputStream fileStream = new FileInputStream(filePath)) {
for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataList()) {
final K key = partitionMetadata.getKey();
if (keyRange.includes(key)) {
// The key value of this partition is in the range.
- final long availableBefore = fileStream.available();
- // We need to limit read bytes on this FileStream, which could be over-read by wrapped
- // compression stream. This depends on the nature of the compression algorithm used.
- // We recommend to wrap with LimitedInputStream once more when
- // reading input from chained compression InputStream.
- // Plus, this stream must be not closed to prevent to close the filtered file partition.
- final LimitedInputStream limitedInputStream =
- new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
- final NonSerializedPartition<K> deserializePartition =
- DataUtil.deserializePartition(
- partitionMetadata.getPartitionSize(), serializer, key, limitedInputStream);
- deserializedPartitions.add(deserializePartition);
- // rearrange file pointer
- final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
- if (toSkip > 0) {
- skipBytes(fileStream, toSkip);
- } else if (toSkip < 0) {
- throw new IOException("file stream has been overread");
- }
+ final byte[] partitionBytes = new byte[partitionMetadata.getPartitionSize()];
+ fileStream.read(partitionBytes, 0, partitionMetadata.getPartitionSize());
+ partitionKeyBytesPairs.add(Pair.of(key, partitionBytes));
} else {
// Have to skip this partition.
skipBytes(fileStream, partitionMetadata.getPartitionSize());
}
}
}
+ for (final Pair<K, byte[]> partitionKeyBytes : partitionKeyBytesPairs) {
+ final NonSerializedPartition<K> deserializePartition =
+ DataUtil.deserializePartition(
+ partitionKeyBytes.right().length, serializer, partitionKeyBytes.left(),
+ new ByteArrayInputStream(partitionKeyBytes.right()));
+ deserializedPartitions.add(deserializePartition);
+ }
} catch (final IOException e) {
throw new BlockFetchException(e);
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index e9e563b..4471ae4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -170,13 +170,8 @@ public final class InputReader extends DataTransfer {
* @return the parallelism of the source task.
*/
public int getSourceParallelism() {
- if (CommunicationPatternProperty.Value.OneToOne
- .equals(runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
- return 1;
- } else {
- final Integer numSrcTasks = srcVertex.getPropertyValue(ParallelismProperty.class).get();
- return numSrcTasks;
- }
+ return srcVertex.getPropertyValue(ParallelismProperty.class).
+ orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
}
/**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 6e4164a..71d810a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -65,22 +65,28 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
this.srcVertexId = srcRuntimeVertexId;
this.dstIrVertex = dstIrVertex;
this.blockManagerWorker = blockManagerWorker;
- this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+ this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
+ orElseThrow(() -> new RuntimeException("No data store property on the edge"));
+
// Setup partitioner
- final int dstParallelism = getDstParallelism();
+ final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
+ orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
final PartitionerProperty.Value partitionerPropertyValue =
- runtimeEdge.getPropertyValue(PartitionerProperty.class).get();
+ runtimeEdge.getPropertyValue(PartitionerProperty.class).
+ orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
switch (partitionerPropertyValue) {
case IntactPartitioner:
this.partitioner = new IntactPartitioner();
break;
case HashPartitioner:
- this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.get());
+ this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
+ orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
break;
case DataSkewHashPartitioner:
- this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.get());
+ this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
+ orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
break;
case DedicatedKeyPerElementPartitioner:
this.partitioner = new DedicatedKeyPerElementPartitioner();
@@ -122,10 +128,8 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
public void close() {
// Commit block.
final DataPersistenceProperty.Value persistence =
- runtimeEdge.getPropertyValue(DataPersistenceProperty.class).get();
- final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
- final int multiplier = duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+ runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
+ orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
final boolean isDataSizeMetricCollectionEdge = Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
.equals(runtimeEdge.getPropertyValue(MetricCollectionProperty.class));
@@ -138,11 +142,11 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
}
this.writtenBytes = blockSizeTotal;
blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
- partitionSizeMap.get(), srcVertexId, getDstParallelism() * multiplier, persistence);
+ partitionSizeMap.get(), srcVertexId, getExpectedRead(), persistence);
} else {
this.writtenBytes = -1; // no written bytes info.
blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
- Collections.emptyMap(), srcVertexId, getDstParallelism() * multiplier, persistence);
+ Collections.emptyMap(), srcVertexId, getExpectedRead(), persistence);
}
}
@@ -158,13 +162,21 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
}
/**
- * Get the parallelism of the destination task.
+ * Get the expected number of data read according to the communication pattern of the edge and
+ * the parallelism of destination vertex.
*
- * @return the parallelism of the destination task.
+ * @return the expected number of data read.
*/
- private int getDstParallelism() {
- return CommunicationPatternProperty.Value.OneToOne.equals(
- runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())
- ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).get();
+ private int getExpectedRead() {
+ final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+ runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+ final int duplicatedDataMultiplier =
+ duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+ final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
+ runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
+ () -> new RuntimeException("No communication pattern on this edge.")))
+ ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
+ () -> new RuntimeException("No parallelism property on the destination vertex."));
+ return readForABlock * duplicatedDataMultiplier;
}
}
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index b46f7ba..f309e49 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -334,11 +334,7 @@ public final class DataTransferTest {
final InputReader reader =
new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
- if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
- assertEquals(1, reader.getSourceParallelism());
- } else {
- assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
- }
+ assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
final List dataRead = new ArrayList<>();
try {
@@ -436,17 +432,9 @@ public final class DataTransferTest {
final InputReader reader2 =
new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
- if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
- assertEquals(1, reader.getSourceParallelism());
- } else {
- assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
- }
+ assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
- if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
- assertEquals(1, reader2.getSourceParallelism());
- } else {
- assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
- }
+ assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
final List dataRead = new ArrayList<>();
try {