You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by suman bharadwaj <su...@gmail.com> on 2014/01/16 11:33:40 UTC

How does shuffle work in spark ?

Hi,

I'm new to spark. And wanted to understand more on how shuffle works in
spark

In Hadoop map reduce, while performing a reduce operation, the intermediate
data from map gets written to disk. How does the same happen in Spark ?

Does spark write the intermediate data to disk ?

Thanks in advance.

Regards,
SB

Re: How does shuffle work in spark ?

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
Operations that use keys to combine multiple RDDs might generate a 
shuffle dependency. A shuffle dependency always stores data to disk. If 
you aren't paying careful attention to partitioners, then you should 
probably assume as a default that the data *will* be written to disk 
since you're not really sure (though it isn't hard to check in your code 
-- you can inspect RDD.getDependencies).

What I was really trying to point out is that operations and 
dependencies are different and that difference can be significant. An 
operation can generate multiple dependencies between RDDs, sometimes a 
"shuffle operation" like join() may have no shuffle dependencies, and 
the dependencies aren't always the same type (a join() could result in 1 
narrow dependency, 1 shuffle).

As for "more often in memory", that might only be true if you're 
careful. Having a default partitioner probably increases the likelihood 
significantly since that makes more RDDs end up with the same 
partitioner. But many operations remove the partitioner, e.g. even a 
union(), which will then force shuffling.

-Ewen
> suman bharadwaj <ma...@gmail.com>
> January 16, 2014 1:22 PM
> Thanks Patrick and Ewen,
>
> Great answers.
>
> So a shuffle dependency that can cause a shuffle will store the data 
> in memory + disk. More often in memory.
> Is my understanding correct ?
>
> Regards,
> SB
>
>
>
> Ewen Cheslack-Postava <ma...@ewencp.org>
> January 16, 2014 1:08 PM
> The difference between a shuffle dependency and a transformation that 
> can cause a shuffle is probably worth pointing out.
>
> The mentioned transformations (groupByKey, join, etc) *might* generate 
> a shuffle dependency on input RDDs, but they won't necessarily. For 
> example, if you join() two RDDs that already use the same partitioner 
> (e.g. a default HashPartitioner with the default parallelism), then no 
> shuffle needs to be performed (and nothing should hit disk). Any 
> records that need to be considered together will already be in the 
> same partitions of the input RDDs (e.g. all records with key X are 
> guaranteed to be in partition hash(X) of both input RDDs, so no 
> shuffling is needed).
>
> Sometimes this is *really* worth exploiting, and even if it only 
> applies to one of the input RDDs. For example, if you're joining 2 
> RDDs and one is much larger than the other and already partitioned, 
> you can explicitly use the partitioner from the larger RDD so that 
> only the smaller RDD gets shuffled.
>
> This also means you probably want to pay attention to transformations 
> that remove partitioners. For example, prefer mapValues() to map(). 
> mapValues() has to maintain the same key, so the output is guaranteed 
> to still be partitioned. map() can change the keys, so partitioning is 
> lost even if you keep the same key.
>
> -Ewen
>
> Patrick Wendell <ma...@gmail.com>
> January 16, 2014 12:16 PM
> The intermediate shuffle output gets written to disk, but it often
> hits the OS-buffer cache since it's not explicitly fsync'ed, so in
> many cases it stays entirely in memory. The behavior of the shuffle is
> agnostic to whether the base RDD is in cache or in disk.
>
> For on-disk RDD's or inputs, the shuffle path still has some key
> differences with Hadoop's implementation, including that it doesn't
> sort on the map side before shuffling.
>
> - Patrick
> suman bharadwaj <ma...@gmail.com>
> January 16, 2014 6:24 AM
> Hi,
>
> Is this behavior the same when the data is in memory ?
> If the data is stored to disk, then how is it different than Hadoop 
> map reduce ?
>
> Regards,
> SB
>
>
>
> Archit Thakur <ma...@gmail.com>
> January 16, 2014 3:41 AM
> For any shuffle operation, groupByKey, etc. it does write map output 
> to disk before performing the reduce task on the data.
>
>
>

Re: How does shuffle work in spark ?

Posted by suman bharadwaj <su...@gmail.com>.
Thanks Patrick and Ewen,

Great answers.

So a shuffle dependency that can cause a shuffle will store the data in
memory + disk. More often in memory.
Is my understanding correct ?

Regards,
SB


On Fri, Jan 17, 2014 at 2:38 AM, Ewen Cheslack-Postava <me...@ewencp.org>wrote:

> The difference between a shuffle dependency and a transformation that can
> cause a shuffle is probably worth pointing out.
>
> The mentioned transformations (groupByKey, join, etc) *might* generate a
> shuffle dependency on input RDDs, but they won't necessarily. For example,
> if you join() two RDDs that already use the same partitioner (e.g. a
> default HashPartitioner with the default parallelism), then no shuffle
> needs to be performed (and nothing should hit disk). Any records that need
> to be considered together will already be in the same partitions of the
> input RDDs (e.g. all records with key X are guaranteed to be in partition
> hash(X) of both input RDDs, so no shuffling is needed).
>
> Sometimes this is *really* worth exploiting, and even if it only applies
> to one of the input RDDs. For example, if you're joining 2 RDDs and one is
> much larger than the other and already partitioned, you can explicitly use
> the partitioner from the larger RDD so that only the smaller RDD gets
> shuffled.
>
> This also means you probably want to pay attention to transformations that
> remove partitioners. For example, prefer mapValues() to map(). mapValues()
> has to maintain the same key, so the output is guaranteed to still be
> partitioned. map() can change the keys, so partitioning is lost even if you
> keep the same key.
>
> -Ewen
>
>   Patrick Wendell <pw...@gmail.com>
>  January 16, 2014 12:16 PM
> The intermediate shuffle output gets written to disk, but it often
> hits the OS-buffer cache since it's not explicitly fsync'ed, so in
> many cases it stays entirely in memory. The behavior of the shuffle is
> agnostic to whether the base RDD is in cache or in disk.
>
> For on-disk RDD's or inputs, the shuffle path still has some key
> differences with Hadoop's implementation, including that it doesn't
> sort on the map side before shuffling.
>
> - Patrick
>   suman bharadwaj <su...@gmail.com>
>  January 16, 2014 6:24 AM
> Hi,
>
> Is this behavior the same when the data is in memory ?
> If the data is stored to disk, then how is it different than Hadoop map
> reduce ?
>
> Regards,
> SB
>
>
>
>   Archit Thakur <ar...@gmail.com>
>  January 16, 2014 3:41 AM
> For any shuffle operation, groupByKey, etc. it does write map output to
> disk before performing the reduce task on the data.
>
>
>
>   suman bharadwaj <su...@gmail.com>
>  January 16, 2014 2:33 AM
> Hi,
>
> I'm new to spark. And wanted to understand more on how shuffle works in
> spark
>
> In Hadoop map reduce, while performing a reduce operation, the
> intermediate data from map gets written to disk. How does the same happen
> in Spark ?
>
> Does spark write the intermediate data to disk ?
>
> Thanks in advance.
>
> Regards,
> SB
>
>

Re: How does shuffle work in spark ?

Posted by Mark Hamstra <ma...@clearstorydata.com>.
And if you are willing to take on yourself the responsibility of
guaranteeing that your mapping preserves partitioning, then you can look to
use mapPartitions, mapPartitionsWithIndex, mapPartitionsWithContext or
mapWith, each of which has a preservesPartitioning parameter that you can
set to true to avoid losing the partitioner.


On Thu, Jan 16, 2014 at 1:08 PM, Ewen Cheslack-Postava <me...@ewencp.org>wrote:

> The difference between a shuffle dependency and a transformation that can
> cause a shuffle is probably worth pointing out.
>
> The mentioned transformations (groupByKey, join, etc) *might* generate a
> shuffle dependency on input RDDs, but they won't necessarily. For example,
> if you join() two RDDs that already use the same partitioner (e.g. a
> default HashPartitioner with the default parallelism), then no shuffle
> needs to be performed (and nothing should hit disk). Any records that need
> to be considered together will already be in the same partitions of the
> input RDDs (e.g. all records with key X are guaranteed to be in partition
> hash(X) of both input RDDs, so no shuffling is needed).
>
> Sometimes this is *really* worth exploiting, and even if it only applies
> to one of the input RDDs. For example, if you're joining 2 RDDs and one is
> much larger than the other and already partitioned, you can explicitly use
> the partitioner from the larger RDD so that only the smaller RDD gets
> shuffled.
>
> This also means you probably want to pay attention to transformations that
> remove partitioners. For example, prefer mapValues() to map(). mapValues()
> has to maintain the same key, so the output is guaranteed to still be
> partitioned. map() can change the keys, so partitioning is lost even if you
> keep the same key.
>
> -Ewen
>
>   Patrick Wendell <pw...@gmail.com>
>  January 16, 2014 12:16 PM
> The intermediate shuffle output gets written to disk, but it often
> hits the OS-buffer cache since it's not explicitly fsync'ed, so in
> many cases it stays entirely in memory. The behavior of the shuffle is
> agnostic to whether the base RDD is in cache or in disk.
>
> For on-disk RDD's or inputs, the shuffle path still has some key
> differences with Hadoop's implementation, including that it doesn't
> sort on the map side before shuffling.
>
> - Patrick
>   suman bharadwaj <su...@gmail.com>
>  January 16, 2014 6:24 AM
> Hi,
>
> Is this behavior the same when the data is in memory ?
> If the data is stored to disk, then how is it different than Hadoop map
> reduce ?
>
> Regards,
> SB
>
>
>
>   Archit Thakur <ar...@gmail.com>
>  January 16, 2014 3:41 AM
> For any shuffle operation, groupByKey, etc. it does write map output to
> disk before performing the reduce task on the data.
>
>
>
>   suman bharadwaj <su...@gmail.com>
>  January 16, 2014 2:33 AM
> Hi,
>
> I'm new to spark. And wanted to understand more on how shuffle works in
> spark
>
> In Hadoop map reduce, while performing a reduce operation, the
> intermediate data from map gets written to disk. How does the same happen
> in Spark ?
>
> Does spark write the intermediate data to disk ?
>
> Thanks in advance.
>
> Regards,
> SB
>
>

Re: How does shuffle work in spark ?

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
The difference between a shuffle dependency and a transformation that 
can cause a shuffle is probably worth pointing out.

The mentioned transformations (groupByKey, join, etc) *might* generate a 
shuffle dependency on input RDDs, but they won't necessarily. For 
example, if you join() two RDDs that already use the same partitioner 
(e.g. a default HashPartitioner with the default parallelism), then no 
shuffle needs to be performed (and nothing should hit disk). Any records 
that need to be considered together will already be in the same 
partitions of the input RDDs (e.g. all records with key X are guaranteed 
to be in partition hash(X) of both input RDDs, so no shuffling is needed).

Sometimes this is *really* worth exploiting, and even if it only applies 
to one of the input RDDs. For example, if you're joining 2 RDDs and one 
is much larger than the other and already partitioned, you can 
explicitly use the partitioner from the larger RDD so that only the 
smaller RDD gets shuffled.

This also means you probably want to pay attention to transformations 
that remove partitioners. For example, prefer mapValues() to map(). 
mapValues() has to maintain the same key, so the output is guaranteed to 
still be partitioned. map() can change the keys, so partitioning is lost 
even if you keep the same key.

-Ewen

> Patrick Wendell <ma...@gmail.com>
> January 16, 2014 12:16 PM
> The intermediate shuffle output gets written to disk, but it often
> hits the OS-buffer cache since it's not explicitly fsync'ed, so in
> many cases it stays entirely in memory. The behavior of the shuffle is
> agnostic to whether the base RDD is in cache or in disk.
>
> For on-disk RDD's or inputs, the shuffle path still has some key
> differences with Hadoop's implementation, including that it doesn't
> sort on the map side before shuffling.
>
> - Patrick
> suman bharadwaj <ma...@gmail.com>
> January 16, 2014 6:24 AM
> Hi,
>
> Is this behavior the same when the data is in memory ?
> If the data is stored to disk, then how is it different than Hadoop 
> map reduce ?
>
> Regards,
> SB
>
>
>
> Archit Thakur <ma...@gmail.com>
> January 16, 2014 3:41 AM
> For any shuffle operation, groupByKey, etc. it does write map output 
> to disk before performing the reduce task on the data.
>
>
>
> suman bharadwaj <ma...@gmail.com>
> January 16, 2014 2:33 AM
> Hi,
>
> I'm new to spark. And wanted to understand more on how shuffle works 
> in spark
>
> In Hadoop map reduce, while performing a reduce operation, the 
> intermediate data from map gets written to disk. How does the same 
> happen in Spark ?
>
> Does spark write the intermediate data to disk ?
>
> Thanks in advance.
>
> Regards,
> SB

Re: How does shuffle work in spark ?

Posted by Patrick Wendell <pw...@gmail.com>.
The intermediate shuffle output gets written to disk, but it often
hits the OS-buffer cache since it's not explicitly fsync'ed, so in
many cases it stays entirely in memory. The behavior of the shuffle is
agnostic to whether the base RDD is in cache or in disk.

For on-disk RDD's or inputs, the shuffle path still has some key
differences with Hadoop's implementation, including that it doesn't
sort on the map side before shuffling.

- Patrick

On Thu, Jan 16, 2014 at 6:24 AM, suman bharadwaj <su...@gmail.com> wrote:
> Hi,
>
> Is this behavior the same when the data is in memory ?
> If the data is stored to disk, then how is it different than Hadoop map
> reduce ?
>
> Regards,
> SB
>
>
> On Thu, Jan 16, 2014 at 5:11 PM, Archit Thakur <ar...@gmail.com>
> wrote:
>>
>> For any shuffle operation, groupByKey, etc. it does write map output to
>> disk before performing the reduce task on the data.
>>
>>
>> On Thu, Jan 16, 2014 at 4:03 PM, suman bharadwaj <su...@gmail.com>
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm new to spark. And wanted to understand more on how shuffle works in
>>> spark
>>>
>>> In Hadoop map reduce, while performing a reduce operation, the
>>> intermediate data from map gets written to disk. How does the same happen in
>>> Spark ?
>>>
>>> Does spark write the intermediate data to disk ?
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> SB
>>
>>
>

Re: How does shuffle work in spark ?

Posted by suman bharadwaj <su...@gmail.com>.
Hi,

Is this behavior the same when the data is in memory ?
If the data is stored to disk, then how is it different than Hadoop map
reduce ?

Regards,
SB


On Thu, Jan 16, 2014 at 5:11 PM, Archit Thakur <ar...@gmail.com>wrote:

> For any shuffle operation, groupByKey, etc. it does write map output to
> disk before performing the reduce task on the data.
>
>
> On Thu, Jan 16, 2014 at 4:03 PM, suman bharadwaj <su...@gmail.com>wrote:
>
>> Hi,
>>
>> I'm new to spark. And wanted to understand more on how shuffle works in
>> spark
>>
>> In Hadoop map reduce, while performing a reduce operation, the
>> intermediate data from map gets written to disk. How does the same happen
>> in Spark ?
>>
>> Does spark write the intermediate data to disk ?
>>
>> Thanks in advance.
>>
>> Regards,
>> SB
>>
>
>

Re: How does shuffle work in spark ?

Posted by Archit Thakur <ar...@gmail.com>.
For any shuffle operation, groupByKey, etc. it does write map output to
disk before performing the reduce task on the data.


On Thu, Jan 16, 2014 at 4:03 PM, suman bharadwaj <su...@gmail.com>wrote:

> Hi,
>
> I'm new to spark. And wanted to understand more on how shuffle works in
> spark
>
> In Hadoop map reduce, while performing a reduce operation, the
> intermediate data from map gets written to disk. How does the same happen
> in Spark ?
>
> Does spark write the intermediate data to disk ?
>
> Thanks in advance.
>
> Regards,
> SB
>

Re: How does shuffle work in spark ?

Posted by Adrian Tanase <at...@adobe.com>.
I don’t know why it expands to 50 GB but it’s correct to see it both on the first operation (shuffled write) and on the next one (shuffled read). It’s the barrier between the 2 stages.

-adrian

From: shahid ashraf
Date: Monday, October 19, 2015 at 9:53 PM
To: Kartik Mathur, Adrian Tanase
Cc: user
Subject: Re: How does shuffle work in spark ?

hi  THANKS

i don't understand, if original data on partitions is 3.5 G and by doing shuffle to that... how it expands to 50 GB... and why then it reads 50 GB for next operations.. i have original data set 0f 100 GB then my data will explode to 1,428.5714286 GBs
and so shuffle reads will be 1,428.5714286 GBs that will be insane

On Mon, Oct 19, 2015 at 11:58 PM, Kartik Mathur <ka...@bluedata.com>> wrote:
That sounds like correct shuffle output , in spark map reduce phase is separated by shuffle , in map each executer writes on local disk and in reduce phase reducerS reads data from each executer over the network , so shuffle definitely hurts performance , for more details on spark shuffle phase please read this

http://0x0fff.com/spark-architecture-shuffle/

Thanks
Kartik


On Mon, Oct 19, 2015 at 6:54 AM, shahid <sh...@trialx.com> wrote:
@all i did partitionby using default hash partitioner on data
[(1,data)(2,(data),(n,data)]
the total data was approx 3.5 it showed shuffle write 50G and on next action
e.g count it is showing shuffle read of 50 G. i don't understand this
behaviour and i think the performance is getting slow with so much shuffle
read on next tranformation operations.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org





--
with Regards
Shahid Ashraf

Re: How does shuffle work in spark ?

Posted by Kartik Mathur <ka...@bluedata.com>.
That will depend on what is your transformation , your code snippet might
help .



On Tue, Oct 20, 2015 at 1:53 AM, shahid ashraf <sh...@trialx.com> wrote:

> Hi
>
> Any idea why is 50 GB shuffle read and write for 3.3 gb data
>
> On Mon, Oct 19, 2015 at 11:58 PM, Kartik Mathur <ka...@bluedata.com>
> wrote:
>
>> That sounds like correct shuffle output , in spark map reduce phase is
>> separated by shuffle , in map each executer writes on local disk and in
>> reduce phase reducerS reads data from each executer over the network , so
>> shuffle definitely hurts performance , for more details on spark shuffle
>> phase please read this
>>
>> http://0x0fff.com/spark-architecture-shuffle/
>>
>> Thanks
>> Kartik
>>
>> On Mon, Oct 19, 2015 at 6:54 AM, shahid <sh...@trialx.com> wrote:
>>
>>> @all i did partitionby using default hash partitioner on data
>>> [(1,data)(2,(data),(n,data)]
>>> the total data was approx 3.5 it showed shuffle write 50G and on next
>>> action
>>> e.g count it is showing shuffle read of 50 G. i don't understand this
>>> behaviour and i think the performance is getting slow with so much
>>> shuffle
>>> read on next tranformation operations.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>
>
> --
> with Regards
> Shahid Ashraf
>

Re: How does shuffle work in spark ?

Posted by shahid <sh...@trialx.com>.
@all i did partitionby using default hash partitioner on data
[(1,data)(2,(data),(n,data)]
the total data was approx 3.5 it showed shuffle write 50G and on next action
e.g count it is showing shuffle read of 50 G. i don't understand this
behaviour and i think the performance is getting slow with so much shuffle
read on next tranformation operations.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org