You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/05/05 02:14:06 UTC

[jira] [Comment Edited] (SPARK-7081) Faster sort-based shuffle path using binary processing cache-aware sort

    [ https://issues.apache.org/jira/browse/SPARK-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14527602#comment-14527602 ] 

Josh Rosen edited comment on SPARK-7081 at 5/5/15 12:13 AM:
------------------------------------------------------------

Cross-posting [a comment from my PR|https://github.com/apache/spark/pull/5868#issuecomment-98888713]:

Based on some additional discussions, I think that we should use specialized sorter implementation that is specific to sort-based shuffle and design a separate sorter for more general-purpose record sorting.  By using a specialized sorter, we can benefit from several performance optimizations that would be difficult to implement in a more general-purpose sorter:

- We may not need a full 32-bits to record the partition id; since shuffles are unlikely to have more than, say, 10 million partitions, we can use fewer bits to encode the partition id, allowing us to pack it into a single word that uses the remaining bits to store the record pointer (exploiting the fact that we can choose a reasonable upper bound on our addressable memory (say 1 terabyte) and can save address bits due to word-alignment).  This should give us excellent cache-locality benefits, since the sort array will only require 8 bytes of space per record.
- Because sort-shuffle's sort keys are partition ids, we can expect to encounter long runs of records with the same sort key in the sorted file; this may not be the case in a more general-purpose sort, where there might be cases in which all of the sort keys are distinct.  As a result, this allows external sort to use a specialized merge procedure that operates on runs of records with the same key rather than individual records.
- Ignoring the spilling/merging case for a moment, the sort shuffle writer only needs to know individual records' lengths during the in-memory sort: we need to know a record's length in order to copy it to the sorted output file, but once a partition's records are adjacent in the output file we no longer need to know their individual lengths.  This is a consequence of the fact that the sorted data will be consumed by a serializer that knows how to identify record boundaries, building on our assumption that the serializer supports reordering of serialized records in its serialization stream.
- If we go further and assume that our IO compression codec supports concatenation of compressed data (which [Snappy seems to support|https://snappy.googlecode.com/svn/trunk/framing_format.txt]), then we can implement an IO-efficient merging procedure for external sort.  If we store an index file for each spill file, identifying the offsets of each partition within the sorted file, then the merge sort procedure can simply traverse these indicies and concatenate the partitions' serialized data without interpreting it. This would let us use methods like `transferTo` to implement copying without requiring data to be buffered in the JVM, sidestepping the complexity of managing IO buffers during the merge.

In light of this, I'm going to work on refactoring my external sorting branch to perform these optimizations and will update this pull request with those changes.  I'm going to remove the  SPARK-7078 JIRA link, since that JIRA seems to be concerned with a more general-purpose record sorter for use in SQL joins.


was (Author: joshrosen):
Cross-posting [a comment from my PR|https://github.com/apache/spark/pull/5868#issuecomment-98888713]:

Based on some additional discussions, I think that we should use specialized sorter implementation that is specific to sort-based shuffle and design a separate sorter for more general-purpose record sorting.  By using a specialized sorter, we can benefit from several performance optimizations that would be difficult to implement in a more general-purpose sorter:

- We may not need a full 32-bits to record the partition id; since shuffles are unlikely to have more than, say, 10 million partitions, we can use fewer bits to encode the partition id, allowing us to pack it into a single word that uses the remaining bits to store the record pointer (exploiting the fact that we can choose a reasonable upper bound on our addressable memory (say 1 terabyte) and can save address bits due to word-alignment).  This should give us excellent cache-locality benefits, since the sort array will only require 8 bytes of space per record.
- Because sort-shuffle's sort keys are partition ids, we can expect to encounter long runs of records with the same sort key in the sorted file; this may not be the case in a more general-purpose sort, where there might be cases in which all of the sort keys are distinct.  As a result, this allows external sort to use a specialized merge procedure that operates on runs of records with the same key rather than individual records.
- Ignoring the spilling/merging case for a moment, the sort shuffle writer only needs to know individual records' lengths during the in-memory sort: we need to know a record's length in order to copy it to the sorted output file, but once a partition's records are adjacent in the output file we no longer need to know their individual lengths.  This is a consequence of the fact that the sorted data will be consumed by a serializer that knows how to identify record boundaries, building on our assumption that the serializer supports reordering of serialized records in its serialization stream.
- If we go further and assume that our IO compression codec supports concatenation of compressed data (which [Snappy seems to support|https://snappy.googlecode.com/svn/trunk/framing_format.txt]), then we can implement an IO-efficient merging procedure for external sort.  If we store an index file for each spill file, identifying the offsets of each partition within the sorted file, then the merge sort procedure can simply traverse these indicies and concatenate the partitions' serialized data without interpreting it. This would let us use methods like `transferTo` to implement copying without requiring data to be buffered in the JVM, sidestepping the complexity of managing IO buffers during the merge.

In light of this, I'm going to work on refactoring my external sorting branch to perform these optimizations and will update this pull request with those changes.  I'm going to remove the SPARK-7081 and SPARK-7078 JIRA links, since those JIRAs seem to be concerned with a more general-purpose record sorter for use in SQL joins.

> Faster sort-based shuffle path using binary processing cache-aware sort
> -----------------------------------------------------------------------
>
>                 Key: SPARK-7081
>                 URL: https://issues.apache.org/jira/browse/SPARK-7081
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle, Spark Core
>            Reporter: Reynold Xin
>            Assignee: Josh Rosen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org