You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/08/08 08:04:12 UTC

[GitHub] johnyangk closed pull request #92: [NEMO-176] Improve sequential read from disk

johnyangk closed pull request #92: [NEMO-176] Improve sequential read from disk
URL: https://github.com/apache/incubator-nemo/pull/92
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 50c43d930..041283a8c 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -80,11 +80,17 @@ private static void serializePartition(final EncoderFactory encoderFactory,
                                                                                      final InputStream inputStream)
       throws IOException {
     final List deserializedData = new ArrayList();
-    final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
-        serializer, partitionSize);
-    iterator.forEachRemaining(deserializedData::add);
-    return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
-        iterator.getNumEncodedBytes());
+    // We need to limit read bytes on this inputStream, which could be over-read by wrapped
+    // compression stream. This depends on the nature of the compression algorithm used.
+    // We recommend to wrap with LimitedInputStream once more when
+    // reading input from chained compression InputStream.
+    try (final LimitedInputStream limitedInputStream = new LimitedInputStream(inputStream, partitionSize)) {
+      final InputStreamIterator iterator =
+          new InputStreamIterator(Collections.singletonList(limitedInputStream).iterator(), serializer);
+      iterator.forEachRemaining(deserializedData::add);
+      return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
+          iterator.getNumEncodedBytes());
+    }
   }
 
   /**
@@ -197,7 +203,6 @@ public static Iterable concatNonSerPartitions(final Iterable<NonSerializedPartit
 
     private final Iterator<InputStream> inputStreams;
     private final Serializer<?, T> serializer;
-    private final long limit;
 
     private volatile CountingInputStream serializedCountingStream = null;
     private volatile CountingInputStream encodedCountingStream = null;
@@ -218,27 +223,6 @@ public static Iterable concatNonSerPartitions(final Iterable<NonSerializedPartit
                         final Serializer<?, T> serializer) {
       this.inputStreams = inputStreams;
       this.serializer = serializer;
-      // -1 means no limit.
-      this.limit = -1;
-    }
-
-    /**
-     * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
-     *
-     * @param inputStreams The streams to read data from.
-     * @param serializer   The serializer.
-     * @param limit        The bytes to read from the {@link InputStream}.
-     */
-    private InputStreamIterator(
-        final Iterator<InputStream> inputStreams,
-        final Serializer<?, T> serializer,
-        final int limit) {
-      if (limit < 0) {
-        throw new IllegalArgumentException("Negative limit not allowed.");
-      }
-      this.inputStreams = inputStreams;
-      this.serializer = serializer;
-      this.limit = limit;
     }
 
     @Override
@@ -249,11 +233,6 @@ public boolean hasNext() {
       if (cannotContinueDecoding) {
         return false;
       }
-      if (limit != -1 && limit == (serializedCountingStream == null
-          ? numSerializedBytes : numSerializedBytes + serializedCountingStream.getCount())) {
-        cannotContinueDecoding = true;
-        return false;
-      }
       while (true) {
         try {
           if (decoder == null) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index b9a04e437..6eef82432 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.data.block;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -172,36 +173,28 @@ public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> par
       // Deserialize the data
       final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
       try {
+        final List<Pair<K, byte[]>> partitionKeyBytesPairs = new ArrayList<>();
         try (final FileInputStream fileStream = new FileInputStream(filePath)) {
           for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataList()) {
             final K key = partitionMetadata.getKey();
             if (keyRange.includes(key)) {
               // The key value of this partition is in the range.
-              final long availableBefore = fileStream.available();
-              // We need to limit read bytes on this FileStream, which could be over-read by wrapped
-              // compression stream. This depends on the nature of the compression algorithm used.
-              // We recommend to wrap with LimitedInputStream once more when
-              // reading input from chained compression InputStream.
-              // Plus, this stream must be not closed to prevent to close the filtered file partition.
-              final LimitedInputStream limitedInputStream =
-                  new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
-              final NonSerializedPartition<K> deserializePartition =
-                  DataUtil.deserializePartition(
-                      partitionMetadata.getPartitionSize(), serializer, key, limitedInputStream);
-              deserializedPartitions.add(deserializePartition);
-              // rearrange file pointer
-              final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
-              if (toSkip > 0) {
-                skipBytes(fileStream, toSkip);
-              } else if (toSkip < 0) {
-                throw new IOException("file stream has been overread");
-              }
+              final byte[] partitionBytes = new byte[partitionMetadata.getPartitionSize()];
+              fileStream.read(partitionBytes, 0, partitionMetadata.getPartitionSize());
+              partitionKeyBytesPairs.add(Pair.of(key, partitionBytes));
             } else {
               // Have to skip this partition.
               skipBytes(fileStream, partitionMetadata.getPartitionSize());
             }
           }
         }
+        for (final Pair<K, byte[]> partitionKeyBytes : partitionKeyBytesPairs) {
+          final NonSerializedPartition<K> deserializePartition =
+              DataUtil.deserializePartition(
+                  partitionKeyBytes.right().length, serializer, partitionKeyBytes.left(),
+                  new ByteArrayInputStream(partitionKeyBytes.right()));
+          deserializedPartitions.add(deserializePartition);
+        }
       } catch (final IOException e) {
         throw new BlockFetchException(e);
       }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index e9e563bfa..4471ae4d3 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -170,13 +170,8 @@ public boolean isSideInputReader() {
    * @return the parallelism of the source task.
    */
   public int getSourceParallelism() {
-    if (CommunicationPatternProperty.Value.OneToOne
-        .equals(runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-      return 1;
-    } else {
-      final Integer numSrcTasks = srcVertex.getPropertyValue(ParallelismProperty.class).get();
-      return numSrcTasks;
-    }
+    return srcVertex.getPropertyValue(ParallelismProperty.class).
+        orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 6e4164a13..71d810a95 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -65,22 +65,28 @@
     this.srcVertexId = srcRuntimeVertexId;
     this.dstIrVertex = dstIrVertex;
     this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
+        orElseThrow(() -> new RuntimeException("No data store property on the edge"));
+
 
     // Setup partitioner
-    final int dstParallelism = getDstParallelism();
+    final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
+        orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
     final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
     final PartitionerProperty.Value partitionerPropertyValue =
-        runtimeEdge.getPropertyValue(PartitionerProperty.class).get();
+        runtimeEdge.getPropertyValue(PartitionerProperty.class).
+            orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
     switch (partitionerPropertyValue) {
       case IntactPartitioner:
         this.partitioner = new IntactPartitioner();
         break;
       case HashPartitioner:
-        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.get());
+        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
+            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
         break;
       case DataSkewHashPartitioner:
-        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.get());
+        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
+            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
         break;
       case DedicatedKeyPerElementPartitioner:
         this.partitioner = new DedicatedKeyPerElementPartitioner();
@@ -122,10 +128,8 @@ public void write(final Object element) {
   public void close() {
     // Commit block.
     final DataPersistenceProperty.Value persistence =
-        runtimeEdge.getPropertyValue(DataPersistenceProperty.class).get();
-    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
-    final int multiplier = duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+        runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
+            orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
 
     final boolean isDataSizeMetricCollectionEdge = Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
         .equals(runtimeEdge.getPropertyValue(MetricCollectionProperty.class));
@@ -138,11 +142,11 @@ public void close() {
       }
       this.writtenBytes = blockSizeTotal;
       blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
-          partitionSizeMap.get(), srcVertexId, getDstParallelism() * multiplier, persistence);
+          partitionSizeMap.get(), srcVertexId, getExpectedRead(), persistence);
     } else {
       this.writtenBytes = -1; // no written bytes info.
       blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
-          Collections.emptyMap(), srcVertexId, getDstParallelism() * multiplier, persistence);
+          Collections.emptyMap(), srcVertexId, getExpectedRead(), persistence);
     }
   }
 
@@ -158,13 +162,21 @@ public void close() {
   }
 
   /**
-   * Get the parallelism of the destination task.
+   * Get the expected number of data read according to the communication pattern of the edge and
+   * the parallelism of destination vertex.
    *
-   * @return the parallelism of the destination task.
+   * @return the expected number of data read.
    */
-  private int getDstParallelism() {
-    return CommunicationPatternProperty.Value.OneToOne.equals(
-        runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())
-        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).get();
+  private int getExpectedRead() {
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+    final int duplicatedDataMultiplier =
+        duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+    final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
+        runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
+            () -> new RuntimeException("No communication pattern on this edge.")))
+        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
+            () -> new RuntimeException("No parallelism property on the destination vertex."));
+    return readForABlock * duplicatedDataMultiplier;
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index b46f7ba10..f309e49c0 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -334,11 +334,7 @@ private void writeAndRead(final BlockManagerWorker sender,
       final InputReader reader =
           new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
 
-      if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
-        assertEquals(1, reader.getSourceParallelism());
-      } else {
-        assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
-      }
+      assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
 
       final List dataRead = new ArrayList<>();
       try {
@@ -436,17 +432,9 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
       final InputReader reader2 =
           new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
 
-      if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
-        assertEquals(1, reader.getSourceParallelism());
-      } else {
-        assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
-      }
+      assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
 
-      if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
-        assertEquals(1, reader2.getSourceParallelism());
-      } else {
-        assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
-      }
+      assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
 
       final List dataRead = new ArrayList<>();
       try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services