You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by John Carrino <jo...@gmail.com> on 2015/06/02 22:50:47 UTC

Possible space improvements to shuffle

One thing I have noticed with ExternalSorter is that if an ordering is not
defined, it does the sort using only the partition_id, instead of
(parition_id, hash).  This means that on the reduce side you need to pull
the entire dataset into memory before you can begin iterating over the
results.

I figure since we are doing a sort of the data anyway it doesn't seem more
expensive to sort by (parition, hash).  That way the reducer can do a merge
and only has the hold in memory the data for a single int hashCode before
it can combine then and start returning results form the iterator.

Has this already been discussed?  If so, can someone point me in the right
direction to find out more?

Thanks for any help!
-jc

p.s. I am using spark version 1.3.1.  The code I am looking at below is
from ExternalSorter#partitionedIterator.  I think maybe
!ordering.isDefined should also include "&& !aggregator.isDefined"

   if (spills.isEmpty && partitionWriters == null) {
      // Special case: if we have only in-memory data, we don't need to
merge streams, and perhaps
      // we don't even need to sort by anything other than partition ID
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition
ID, not key

groupByPartition(collection.destructiveSortedIterator(partitionComparator))
      } else {
        // We do need to sort by both partition ID and key

groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator))
      }

Re: Possible space improvements to shuffle

Posted by John Carrino <jo...@gmail.com>.
Yes, I think that bug is what I want.  Thank you.

So I guess the current reason is that we don't want to buffer up numMapper
incoming streams. So we just iterate through each and transfer it over in
full because that is more network efficient?

I'm not sure I understand why you wouldn't want to sort on the composite
(parition_id, hash).  I think using the partitionKeyComparator should be
ok, because the other case of merging with spilled files uses it and that
works out ok.

The aggregation I am doing basically has as many output rows as input rows
so I am seeing a lot of memory pressure on the reduce side but it doesn't
have the same ability to spill like map does.

-jc



On Tue, Jun 2, 2015 at 2:08 PM, Josh Rosen <ro...@gmail.com> wrote:

> The relevant JIRA that springs to mind is
> https://issues.apache.org/jira/browse/SPARK-2926
>
> If an aggregator and ordering are both defined, then the map side of
> sort-based shuffle will sort based on the key ordering so that map-side
> spills can be efficiently merged.  We do not currently do a sort-based
> merge on the reduce side; implementing this is a little tricky because it
> will require more map partitions' output to be buffered on the reduce
> side.  I think that SPARK-2926 has some proposals of how to deal with this,
> including hierarchical merging of reduce outputs.
>
> RE: ExternalSorter#partitionedIterator, I don't think it's safe to do !ordering.isDefined
> && !aggregator.isDefined.  If an aggregator is defined but we don't have
> an ordering, then I don't think it makes sense to sort the keys based on
> their hashcodes or some default ordering, since hashcode collisions would
> lead to incorrect results for sort-based aggregation.
>
> On Tue, Jun 2, 2015 at 1:50 PM, John Carrino <jo...@gmail.com>
> wrote:
>
>> One thing I have noticed with ExternalSorter is that if an ordering is
>> not defined, it does the sort using only the partition_id, instead of
>> (parition_id, hash).  This means that on the reduce side you need to pull
>> the entire dataset into memory before you can begin iterating over the
>> results.
>>
>> I figure since we are doing a sort of the data anyway it doesn't seem
>> more expensive to sort by (parition, hash).  That way the reducer can do a
>> merge and only has the hold in memory the data for a single int hashCode
>> before it can combine then and start returning results form the iterator.
>>
>> Has this already been discussed?  If so, can someone point me in the
>> right direction to find out more?
>>
>> Thanks for any help!
>> -jc
>>
>> p.s. I am using spark version 1.3.1.  The code I am looking at below is
>> from ExternalSorter#partitionedIterator.  I think maybe
>> !ordering.isDefined should also include "&& !aggregator.isDefined"
>>
>>    if (spills.isEmpty && partitionWriters == null) {
>>       // Special case: if we have only in-memory data, we don't need to
>> merge streams, and perhaps
>>       // we don't even need to sort by anything other than partition ID
>>       if (!ordering.isDefined) {
>>         // The user hasn't requested sorted keys, so only sort by
>> partition ID, not key
>>
>> groupByPartition(collection.destructiveSortedIterator(partitionComparator))
>>       } else {
>>         // We do need to sort by both partition ID and key
>>
>> groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator))
>>       }
>>
>
>

Re: Possible space improvements to shuffle

Posted by Josh Rosen <ro...@gmail.com>.
The relevant JIRA that springs to mind is
https://issues.apache.org/jira/browse/SPARK-2926

If an aggregator and ordering are both defined, then the map side of
sort-based shuffle will sort based on the key ordering so that map-side
spills can be efficiently merged.  We do not currently do a sort-based
merge on the reduce side; implementing this is a little tricky because it
will require more map partitions' output to be buffered on the reduce
side.  I think that SPARK-2926 has some proposals of how to deal with this,
including hierarchical merging of reduce outputs.

RE: ExternalSorter#partitionedIterator, I don't think it's safe to do
!ordering.isDefined
&& !aggregator.isDefined.  If an aggregator is defined but we don't have an
ordering, then I don't think it makes sense to sort the keys based on their
hashcodes or some default ordering, since hashcode collisions would lead to
incorrect results for sort-based aggregation.

On Tue, Jun 2, 2015 at 1:50 PM, John Carrino <jo...@gmail.com> wrote:

> One thing I have noticed with ExternalSorter is that if an ordering is not
> defined, it does the sort using only the partition_id, instead of
> (parition_id, hash).  This means that on the reduce side you need to pull
> the entire dataset into memory before you can begin iterating over the
> results.
>
> I figure since we are doing a sort of the data anyway it doesn't seem more
> expensive to sort by (parition, hash).  That way the reducer can do a merge
> and only has the hold in memory the data for a single int hashCode before
> it can combine then and start returning results form the iterator.
>
> Has this already been discussed?  If so, can someone point me in the right
> direction to find out more?
>
> Thanks for any help!
> -jc
>
> p.s. I am using spark version 1.3.1.  The code I am looking at below is
> from ExternalSorter#partitionedIterator.  I think maybe
> !ordering.isDefined should also include "&& !aggregator.isDefined"
>
>    if (spills.isEmpty && partitionWriters == null) {
>       // Special case: if we have only in-memory data, we don't need to
> merge streams, and perhaps
>       // we don't even need to sort by anything other than partition ID
>       if (!ordering.isDefined) {
>         // The user hasn't requested sorted keys, so only sort by
> partition ID, not key
>
> groupByPartition(collection.destructiveSortedIterator(partitionComparator))
>       } else {
>         // We do need to sort by both partition ID and key
>
> groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator))
>       }
>