You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/08 08:04:13 UTC

[incubator-nemo] branch master updated: [NEMO-176] Improve sequential read from disk #92

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 435a274  [NEMO-176] Improve sequential read from disk #92
435a274 is described below

commit 435a2740a1760e091f5b98b40c69820965108fc0
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Wed Aug 8 17:04:11 2018 +0900

    [NEMO-176] Improve sequential read from disk #92
    
    JIRA: [NEMO-176: Improve sequential read from disk](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-176)
    
    **Major changes:**
    - Read whole data to read from file stream first and decode them later in `FileBlock`
    
    **Minor changes to note:**
    - Remove `limit` parameter in `InputStreamIterator` of `DataUtil` and make `DataUtil#deserializePartition` to limit the stream itself by the partition size.
    - Remove the assumption that "the source and destination parallelism of One-to-One communication is always 1" in `OutputWriter` and `InputReader`
    
    **Tests for the changes:**
    - Existing `DataTransferTest`, `BlockStoreTest` and other unit tests cover this change.
    - Existing integration tests also cover this change.
    
    **Other comments:**
    - N/A.
    
    resolves [NEMO-176](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-176)
---
 .../snu/nemo/runtime/executor/data/DataUtil.java   | 43 ++++++--------------
 .../runtime/executor/data/block/FileBlock.java     | 31 ++++++---------
 .../runtime/executor/datatransfer/InputReader.java |  9 +----
 .../executor/datatransfer/OutputWriter.java        | 46 ++++++++++++++--------
 .../executor/datatransfer/DataTransferTest.java    | 18 ++-------
 5 files changed, 57 insertions(+), 90 deletions(-)

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 50c43d9..041283a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -80,11 +80,17 @@ public final class DataUtil {
                                                                                      final InputStream inputStream)
       throws IOException {
     final List deserializedData = new ArrayList();
-    final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
-        serializer, partitionSize);
-    iterator.forEachRemaining(deserializedData::add);
-    return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
-        iterator.getNumEncodedBytes());
+    // We need to limit read bytes on this inputStream, which could be over-read by wrapped
+    // compression stream. This depends on the nature of the compression algorithm used.
+    // We recommend to wrap with LimitedInputStream once more when
+    // reading input from chained compression InputStream.
+    try (final LimitedInputStream limitedInputStream = new LimitedInputStream(inputStream, partitionSize)) {
+      final InputStreamIterator iterator =
+          new InputStreamIterator(Collections.singletonList(limitedInputStream).iterator(), serializer);
+      iterator.forEachRemaining(deserializedData::add);
+      return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
+          iterator.getNumEncodedBytes());
+    }
   }
 
   /**
@@ -197,7 +203,6 @@ public final class DataUtil {
 
     private final Iterator<InputStream> inputStreams;
     private final Serializer<?, T> serializer;
-    private final long limit;
 
     private volatile CountingInputStream serializedCountingStream = null;
     private volatile CountingInputStream encodedCountingStream = null;
@@ -218,27 +223,6 @@ public final class DataUtil {
                         final Serializer<?, T> serializer) {
       this.inputStreams = inputStreams;
       this.serializer = serializer;
-      // -1 means no limit.
-      this.limit = -1;
-    }
-
-    /**
-     * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
-     *
-     * @param inputStreams The streams to read data from.
-     * @param serializer   The serializer.
-     * @param limit        The bytes to read from the {@link InputStream}.
-     */
-    private InputStreamIterator(
-        final Iterator<InputStream> inputStreams,
-        final Serializer<?, T> serializer,
-        final int limit) {
-      if (limit < 0) {
-        throw new IllegalArgumentException("Negative limit not allowed.");
-      }
-      this.inputStreams = inputStreams;
-      this.serializer = serializer;
-      this.limit = limit;
     }
 
     @Override
@@ -249,11 +233,6 @@ public final class DataUtil {
       if (cannotContinueDecoding) {
         return false;
       }
-      if (limit != -1 && limit == (serializedCountingStream == null
-          ? numSerializedBytes : numSerializedBytes + serializedCountingStream.getCount())) {
-        cannotContinueDecoding = true;
-        return false;
-      }
       while (true) {
         try {
           if (decoder == null) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index b9a04e4..6eef824 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.data.block;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -172,36 +173,28 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
       // Deserialize the data
       final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
       try {
+        final List<Pair<K, byte[]>> partitionKeyBytesPairs = new ArrayList<>();
         try (final FileInputStream fileStream = new FileInputStream(filePath)) {
           for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataList()) {
             final K key = partitionMetadata.getKey();
             if (keyRange.includes(key)) {
               // The key value of this partition is in the range.
-              final long availableBefore = fileStream.available();
-              // We need to limit read bytes on this FileStream, which could be over-read by wrapped
-              // compression stream. This depends on the nature of the compression algorithm used.
-              // We recommend to wrap with LimitedInputStream once more when
-              // reading input from chained compression InputStream.
-              // Plus, this stream must be not closed to prevent to close the filtered file partition.
-              final LimitedInputStream limitedInputStream =
-                  new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
-              final NonSerializedPartition<K> deserializePartition =
-                  DataUtil.deserializePartition(
-                      partitionMetadata.getPartitionSize(), serializer, key, limitedInputStream);
-              deserializedPartitions.add(deserializePartition);
-              // rearrange file pointer
-              final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
-              if (toSkip > 0) {
-                skipBytes(fileStream, toSkip);
-              } else if (toSkip < 0) {
-                throw new IOException("file stream has been overread");
-              }
+              final byte[] partitionBytes = new byte[partitionMetadata.getPartitionSize()];
+              fileStream.read(partitionBytes, 0, partitionMetadata.getPartitionSize());
+              partitionKeyBytesPairs.add(Pair.of(key, partitionBytes));
             } else {
               // Have to skip this partition.
               skipBytes(fileStream, partitionMetadata.getPartitionSize());
             }
           }
         }
+        for (final Pair<K, byte[]> partitionKeyBytes : partitionKeyBytesPairs) {
+          final NonSerializedPartition<K> deserializePartition =
+              DataUtil.deserializePartition(
+                  partitionKeyBytes.right().length, serializer, partitionKeyBytes.left(),
+                  new ByteArrayInputStream(partitionKeyBytes.right()));
+          deserializedPartitions.add(deserializePartition);
+        }
       } catch (final IOException e) {
         throw new BlockFetchException(e);
       }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index e9e563b..4471ae4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -170,13 +170,8 @@ public final class InputReader extends DataTransfer {
    * @return the parallelism of the source task.
    */
   public int getSourceParallelism() {
-    if (CommunicationPatternProperty.Value.OneToOne
-        .equals(runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-      return 1;
-    } else {
-      final Integer numSrcTasks = srcVertex.getPropertyValue(ParallelismProperty.class).get();
-      return numSrcTasks;
-    }
+    return srcVertex.getPropertyValue(ParallelismProperty.class).
+        orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 6e4164a..71d810a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -65,22 +65,28 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
     this.srcVertexId = srcRuntimeVertexId;
     this.dstIrVertex = dstIrVertex;
     this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
+        orElseThrow(() -> new RuntimeException("No data store property on the edge"));
+
 
     // Setup partitioner
-    final int dstParallelism = getDstParallelism();
+    final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
+        orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
     final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
     final PartitionerProperty.Value partitionerPropertyValue =
-        runtimeEdge.getPropertyValue(PartitionerProperty.class).get();
+        runtimeEdge.getPropertyValue(PartitionerProperty.class).
+            orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
     switch (partitionerPropertyValue) {
       case IntactPartitioner:
         this.partitioner = new IntactPartitioner();
         break;
       case HashPartitioner:
-        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.get());
+        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
+            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
         break;
       case DataSkewHashPartitioner:
-        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.get());
+        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
+            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
         break;
       case DedicatedKeyPerElementPartitioner:
         this.partitioner = new DedicatedKeyPerElementPartitioner();
@@ -122,10 +128,8 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
   public void close() {
     // Commit block.
     final DataPersistenceProperty.Value persistence =
-        runtimeEdge.getPropertyValue(DataPersistenceProperty.class).get();
-    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
-    final int multiplier = duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+        runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
+            orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
 
     final boolean isDataSizeMetricCollectionEdge = Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
         .equals(runtimeEdge.getPropertyValue(MetricCollectionProperty.class));
@@ -138,11 +142,11 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
       }
       this.writtenBytes = blockSizeTotal;
       blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
-          partitionSizeMap.get(), srcVertexId, getDstParallelism() * multiplier, persistence);
+          partitionSizeMap.get(), srcVertexId, getExpectedRead(), persistence);
     } else {
       this.writtenBytes = -1; // no written bytes info.
       blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
-          Collections.emptyMap(), srcVertexId, getDstParallelism() * multiplier, persistence);
+          Collections.emptyMap(), srcVertexId, getExpectedRead(), persistence);
     }
   }
 
@@ -158,13 +162,21 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
   }
 
   /**
-   * Get the parallelism of the destination task.
+   * Get the expected number of data read according to the communication pattern of the edge and
+   * the parallelism of destination vertex.
    *
-   * @return the parallelism of the destination task.
+   * @return the expected number of data read.
    */
-  private int getDstParallelism() {
-    return CommunicationPatternProperty.Value.OneToOne.equals(
-        runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())
-        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).get();
+  private int getExpectedRead() {
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+    final int duplicatedDataMultiplier =
+        duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+    final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
+        runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
+            () -> new RuntimeException("No communication pattern on this edge.")))
+        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
+            () -> new RuntimeException("No parallelism property on the destination vertex."));
+    return readForABlock * duplicatedDataMultiplier;
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index b46f7ba..f309e49 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -334,11 +334,7 @@ public final class DataTransferTest {
       final InputReader reader =
           new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
 
-      if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
-        assertEquals(1, reader.getSourceParallelism());
-      } else {
-        assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
-      }
+      assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
 
       final List dataRead = new ArrayList<>();
       try {
@@ -436,17 +432,9 @@ public final class DataTransferTest {
       final InputReader reader2 =
           new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
 
-      if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
-        assertEquals(1, reader.getSourceParallelism());
-      } else {
-        assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
-      }
+      assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
 
-      if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
-        assertEquals(1, reader2.getSourceParallelism());
-      } else {
-        assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
-      }
+      assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
 
       final List dataRead = new ArrayList<>();
       try {