You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/05/03 19:21:06 UTC

[jira] [Commented] (SPARK-5581) When writing sorted map output file, avoid open / close between each partition

    [ https://issues.apache.org/jira/browse/SPARK-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525907#comment-14525907 ] 

Josh Rosen commented on SPARK-5581:
-----------------------------------

I also noticed this while working on some new shuffle code.

One way to address this might be to de-couple the commit and close steps.  Maybe we could add a {{commit()}} operation that returns a {{FileSegment}}.  There may be some subtleties to watch out for in how we handle compression and serialization output streams. The current code opens new serialization and compression streams for each partition and we might have to do the same thing even if we re-use the underlying FileOutputStream. It's possible that we could "reset" the compression/serialization streams without closing them, but some implementations might not support this (for example, SnappyOutputStream writes a magic number at the beginning of the stream, so we'd need to make sure that this gets written to the start of each partition). This means that we might not gain the full benefits of being able to re-use the buffers in the compression output streams.

If the cost of opening an output stream is sufficiently high, though, then even just re-using the FileOutputStream could be a win, especially in cases where we have thousands of small (since this would be an order-of-magnitude reduction in the number of file handles opened).

> When writing sorted map output file, avoid open / close between each partition
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-5581
>                 URL: https://issues.apache.org/jira/browse/SPARK-5581
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 1.3.0
>            Reporter: Sandy Ryza
>
> {code}
>       // Bypassing merge-sort; get an iterator by partition and just write everything directly.
>       for ((id, elements) <- this.partitionedIterator) {
>         if (elements.hasNext) {
>           val writer = blockManager.getDiskWriter(
>             blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
>           for (elem <- elements) {
>             writer.write(elem)
>           }
>           writer.commitAndClose()
>           val segment = writer.fileSegment()
>           lengths(id) = segment.length
>         }
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org