You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel Darabos <da...@lynxanalytics.com> on 2014/06/14 21:14:43 UTC

Is shuffle "stable"?

What I mean is, let's say I run this:

sc.parallelize(Seq(0->3, 0->2, 0->1), 3).partitionBy(HashPartitioner(3)).collect


Will the result always be Array((0,3), (0,2), (0,1))? Or could I
possibly get a different order?


I'm pretty sure the shuffle files are taken in the order of the source
partitions... But after much search, and the discussion on
http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order
I still can't find the code that does this.


Thanks!

Re: Is shuffle "stable"?

Posted by Daniel Darabos <da...@lynxanalytics.com>.
Thanks Matei!

In the example all three items have the same key, so they go to the same
partition:

scala> sc.parallelize(Seq(0->3, 0->2, 0->1), 3).partitionBy(new
HashPartitioner(3)).glom.collect


Array(Array((0,3), (0,2), (0,1)), Array(), Array())


I guess the apparent stability is just due to the single-threaded "local"
master processing partitions in order, so (0,3) is produced first, etc.



On Sun, Jun 15, 2014 at 12:55 AM, Matei Zaharia <ma...@gmail.com>
wrote:

> The order is not guaranteed actually, only which keys end up in each
> partition. Reducers may fetch data from map tasks in an arbitrary order,
> depending on which ones are available first. If you’d like a specific
> order, you should sort each partition. Here you might be getting it because
> each partition only ends up having one element, and collect() does return
> the partitions in order.
>
> Matei
>
> On Jun 14, 2014, at 12:14 PM, Daniel Darabos <
> daniel.darabos@lynxanalytics.com> wrote:
>
> What I mean is, let's say I run this:
>
> sc.parallelize(Seq(0->3, 0->2, 0->1), 3).partitionBy(HashPartitioner(3)).collect
>
>
> Will the result always be Array((0,3), (0,2), (0,1))? Or could I possibly get a different order?
>
>
> I'm pretty sure the shuffle files are taken in the order of the source partitions... But after much search, and the discussion on http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order I still can't find the code that does this.
>
>
> Thanks!
>
>
>

Re: Is shuffle "stable"?

Posted by Matei Zaharia <ma...@gmail.com>.
The order is not guaranteed actually, only which keys end up in each partition. Reducers may fetch data from map tasks in an arbitrary order, depending on which ones are available first. If you’d like a specific order, you should sort each partition. Here you might be getting it because each partition only ends up having one element, and collect() does return the partitions in order.

Matei

On Jun 14, 2014, at 12:14 PM, Daniel Darabos <da...@lynxanalytics.com> wrote:

> What I mean is, let's say I run this:
> 
> sc.parallelize(Seq(0->3, 0->2, 0->1), 3).partitionBy(HashPartitioner(3)).collect
> 
> Will the result always be Array((0,3), (0,2), (0,1))? Or could I possibly get a different order?
> 
> I'm pretty sure the shuffle files are taken in the order of the source partitions... But after much search, and the discussion on http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order I still can't find the code that does this.
> 
> Thanks!