You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sea <26...@qq.com> on 2016/02/07 15:28:15 UTC

回复: Shuffle memory woes

Hi,Corey:
   "The dataset is 100gb at most, the spills can up to 10T-100T", Are your input files lzo format, and you use sc.text() ? If memory is not enough, spark will spill 3-4x of input data to disk.




------------------ 原始邮件 ------------------
发件人: "Corey Nolet";<cj...@gmail.com>;
发送时间: 2016年2月7日(星期天) 晚上8:56
收件人: "Igor Berman"<ig...@gmail.com>; 
抄送: "user"<us...@spark.apache.org>; 
主题: Re: Shuffle memory woes



As for the second part of your questions- we have a fairly complex join process which requires a ton of stage orchestration from our driver. I've written some code to be able to walk down our DAG tree and execute siblings in the tree concurrently where possible (forcing cache to disk on children that that have multiple chiildren themselves so that they can be run concurrently). Ultimatey, we have seen significant speedup in our jobs by keeping tasks as busy as possible processing concurrent stages. Funny enough though, the stage that is causing problems with shuffling for us has a lot of children and doesn't even run concurrently with any other stages so I ruled out the concurrency of the stages as a culprit for the shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cj...@gmail.com> wrote:
Igor,

I don't think the question is "why can't it fit stuff in memory". I know why it can't fit stuff in memory- because it's a large dataset that needs to have a reduceByKey() run on it. My understanding is that when it doesn't fit into memory it needs to spill in order to consolidate intermediary files into a single file. The more data you need to run through this, the more it will need to spill. My findings is that once it gets stuck in this spill chain with our dataset it's all over @ that point because it will spill and spill and spill and spill and spill. If I give the shuffle enough memory it won't- irrespective of the number of partitions we have (i've done everything from repartition(500) to repartition(2500)). It's not a matter of running out of memory on a single node because the data is skewed. It's more a matter of the shuffle buffer filling up and needing to spill. I think what may be happening is that it gets to a point where it's spending more time reading/writing from disk while doing the spills then it is actually processing any data. I can tell this because I can see that the spills sometimes get up into the 10's to 100's of TB where the input data was maybe acquireExecutionMemory at most. Unfortunately my code is on a private internal network and I'm not able to share it. 


On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <ig...@gmail.com> wrote:
so can you provide code snippets: especially it's interesting to see what are your transformation chain, how many partitions are there on each side of shuffle operation

the question is why it can't fit stuff in memory when you are shuffling - maybe your partitioner on "reduce" side is not configured properly? I mean if map side is ok, and you just reducing by key or something it should be ok, so some detail is missing...skewed data? aggregate by key?


On 6 February 2016 at 20:13, Corey Nolet <cj...@gmail.com> wrote:
Igor,

Thank you for the response but unfortunately, the problem I'm referring to goes beyond this. I have set the shuffle memory fraction to be 90% and set the cache memory to be 0. Repartitioning the RDD helped a tad on the map side but didn't do much for the spilling when there was no longer any memory left for the shuffle. Also the new auto-memory management doesn't seem like it'll have too much of an effect after i've already given most the memory i've allocated to the shuffle. The problem I'm having is most specifically related to the shuffle performing declining by several orders of magnitude when it needs to spill multiple times (it ends up spilling several hundred for me when it can't fit stuff into memory).






On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <ig...@gmail.com> wrote:
Hi,usually you can solve this by 2 steps
make rdd to have more partitions
play with shuffle memory fraction


in spark 1.6 cache vs shuffle memory fractions are adjusted automatically


On 5 February 2016 at 23:07, Corey Nolet <cj...@gmail.com> wrote:
I just recently had a discovery that my jobs were taking several hours to completely because of excess shuffle spills. What I found was that when I hit the high point where I didn't have enough memory for the shuffles to store all of their file consolidations at once, it could spill so many times that it causes my job's runtime to increase by orders of magnitude (and sometimes fail altogether).


I've played with all the tuning parameters I can find. To speed the shuffles up, I tuned the akka threads to different values. I also tuned the shuffle buffering a tad (both up and down). 


I feel like I see a weak point here. The mappers are sharing memory space with reducers and the shuffles need enough memory to consolidate and pull otherwise they will need to spill and spill and spill. What i've noticed about my jobs is that this is a difference between them taking 30 minutes and 4 hours or more. Same job- just different memory tuning.


I've found that, as a result of the spilling, I'm better off not caching any data in memory and lowering my storage fraction to 0 and still hoping I was able to give my shuffles enough memory that my data doesn't continuously spill. Is this the way it's supposed to be? It makes it hard because it seems like it forces the memory limits on my job- otherwise it could take orders of magnitude longer to execute.

Re: Shuffle memory woes

Posted by Corey Nolet <cj...@gmail.com>.
I sure do! [1] And yes- I'm really hoping they will chime in, otherwise I
may dig a little deeper myself and start posting some jira tickets.

[1] http://www.slideshare.net/cjnolet

On Mon, Feb 8, 2016 at 3:02 AM, Igor Berman <ig...@gmail.com> wrote:

> It's interesting to see what spark dev people will say.
> Corey do you have presentation available online?
>
> On 8 February 2016 at 05:16, Corey Nolet <cj...@gmail.com> wrote:
>
>> Charles,
>>
>> Thank you for chiming in and I'm glad someone else is experiencing this
>> too and not just me. I know very well how the Spark shuffles work and I've
>> done deep dive presentations @ Spark meetups in the past. This problem is
>> somethng that goes beyond that and, I believe, it exposes a fundamental
>> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
>> it can be fixed.
>>
>> Also- in regards to how much data actually gets shuffled- believe it or
>> not this problem can take a 30-40 minute job and make it run for 4 or more
>> hours. If  let the job run for 4+ hours the amount of data being shuffled
>> for this particular dataset will be 100 or more TB. Usually, however, I end
>> up killing the job long before that point because I realize it should not
>> be taking this long. The particular dataset we're doing is not for
>> real-time exploration. These are very large joins we're doing for jobs that
>> we run a few times a day.
>>
>> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xp...@gmail.com>
>> wrote:
>>
>>>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>>>
>>> -- I have had the same experiences, although not to this extreme (the
>>> spills were < 10T while the input was ~ 100s gb) and haven't found any
>>> solution yet. I don't believe this is related to input data format. in my
>>> case, I got my input data by loading from Hive tables.
>>>
>>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <26...@qq.com> wrote:
>>>
>>>> Hi,Corey:
>>>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>>>> your input files lzo format, and you use sc.text() ? If memory is not
>>>> enough, spark will spill 3-4x of input data to disk.
>>>>
>>>>
>>>> ------------------ 原始邮件 ------------------
>>>> *发件人:* "Corey Nolet";<cj...@gmail.com>;
>>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>>>> *收件人:* "Igor Berman"<ig...@gmail.com>;
>>>> *抄送:* "user"<us...@spark.apache.org>;
>>>> *主题:* Re: Shuffle memory woes
>>>>
>>>> As for the second part of your questions- we have a fairly complex join
>>>> process which requires a ton of stage orchestration from our driver. I've
>>>> written some code to be able to walk down our DAG tree and execute siblings
>>>> in the tree concurrently where possible (forcing cache to disk on children
>>>> that that have multiple chiildren themselves so that they can be run
>>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>>>> keeping tasks as busy as possible processing concurrent stages. Funny
>>>> enough though, the stage that is causing problems with shuffling for us has
>>>> a lot of children and doesn't even run concurrently with any other stages
>>>> so I ruled out the concurrency of the stages as a culprit for the
>>>> shuffliing problem we're seeing.
>>>>
>>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cj...@gmail.com> wrote:
>>>>
>>>>> Igor,
>>>>>
>>>>> I don't think the question is "why can't it fit stuff in memory". I
>>>>> know why it can't fit stuff in memory- because it's a large dataset that
>>>>> needs to have a reduceByKey() run on it. My understanding is that when it
>>>>> doesn't fit into memory it needs to spill in order to consolidate
>>>>> intermediary files into a single file. The more data you need to run
>>>>> through this, the more it will need to spill. My findings is that once it
>>>>> gets stuck in this spill chain with our dataset it's all over @ that point
>>>>> because it will spill and spill and spill and spill and spill. If I give
>>>>> the shuffle enough memory it won't- irrespective of the number of
>>>>> partitions we have (i've done everything from repartition(500) to
>>>>> repartition(2500)). It's not a matter of running out of memory on a single
>>>>> node because the data is skewed. It's more a matter of the shuffle buffer
>>>>> filling up and needing to spill. I think what may be happening is that it
>>>>> gets to a point where it's spending more time reading/writing from disk
>>>>> while doing the spills then it is actually processing any data. I can tell
>>>>> this because I can see that the spills sometimes get up into the 10's to
>>>>> 100's of TB where the input data was maybe acquireExecutionMemory at
>>>>> most. Unfortunately my code is on a private internal network and I'm
>>>>> not able to share it.
>>>>>
>>>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <ig...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> so can you provide code snippets: especially it's interesting to see
>>>>>> what are your transformation chain, how many partitions are there on each
>>>>>> side of shuffle operation
>>>>>>
>>>>>> the question is why it can't fit stuff in memory when you are
>>>>>> shuffling - maybe your partitioner on "reduce" side is not configured
>>>>>> properly? I mean if map side is ok, and you just reducing by key or
>>>>>> something it should be ok, so some detail is missing...skewed data?
>>>>>> aggregate by key?
>>>>>>
>>>>>> On 6 February 2016 at 20:13, Corey Nolet <cj...@gmail.com> wrote:
>>>>>>
>>>>>>> Igor,
>>>>>>>
>>>>>>> Thank you for the response but unfortunately, the problem I'm
>>>>>>> referring to goes beyond this. I have set the shuffle memory fraction to be
>>>>>>> 90% and set the cache memory to be 0. Repartitioning the RDD helped a tad
>>>>>>> on the map side but didn't do much for the spilling when there was no
>>>>>>> longer any memory left for the shuffle. Also the new auto-memory management
>>>>>>> doesn't seem like it'll have too much of an effect after i've already given
>>>>>>> most the memory i've allocated to the shuffle. The problem I'm having is
>>>>>>> most specifically related to the shuffle performing declining by several
>>>>>>> orders of magnitude when it needs to spill multiple times (it ends up
>>>>>>> spilling several hundred for me when it can't fit stuff into memory).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <ig...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> usually you can solve this by 2 steps
>>>>>>>> make rdd to have more partitions
>>>>>>>> play with shuffle memory fraction
>>>>>>>>
>>>>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>>>>> automatically
>>>>>>>>
>>>>>>>> On 5 February 2016 at 23:07, Corey Nolet <cj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I just recently had a discovery that my jobs were taking several
>>>>>>>>> hours to completely because of excess shuffle spills. What I found was that
>>>>>>>>> when I hit the high point where I didn't have enough memory for the
>>>>>>>>> shuffles to store all of their file consolidations at once, it could spill
>>>>>>>>> so many times that it causes my job's runtime to increase by orders of
>>>>>>>>> magnitude (and sometimes fail altogether).
>>>>>>>>>
>>>>>>>>> I've played with all the tuning parameters I can find. To speed
>>>>>>>>> the shuffles up, I tuned the akka threads to different values. I also tuned
>>>>>>>>> the shuffle buffering a tad (both up and down).
>>>>>>>>>
>>>>>>>>> I feel like I see a weak point here. The mappers are sharing
>>>>>>>>> memory space with reducers and the shuffles need enough memory to
>>>>>>>>> consolidate and pull otherwise they will need to spill and spill and spill.
>>>>>>>>> What i've noticed about my jobs is that this is a difference between them
>>>>>>>>> taking 30 minutes and 4 hours or more. Same job- just different memory
>>>>>>>>> tuning.
>>>>>>>>>
>>>>>>>>> I've found that, as a result of the spilling, I'm better off not
>>>>>>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>>>>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>>>>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>>>>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>>>>>>> could take orders of magnitude longer to execute.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Shuffle memory woes

Posted by Igor Berman <ig...@gmail.com>.
It's interesting to see what spark dev people will say.
Corey do you have presentation available online?

On 8 February 2016 at 05:16, Corey Nolet <cj...@gmail.com> wrote:

> Charles,
>
> Thank you for chiming in and I'm glad someone else is experiencing this
> too and not just me. I know very well how the Spark shuffles work and I've
> done deep dive presentations @ Spark meetups in the past. This problem is
> somethng that goes beyond that and, I believe, it exposes a fundamental
> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
> it can be fixed.
>
> Also- in regards to how much data actually gets shuffled- believe it or
> not this problem can take a 30-40 minute job and make it run for 4 or more
> hours. If  let the job run for 4+ hours the amount of data being shuffled
> for this particular dataset will be 100 or more TB. Usually, however, I end
> up killing the job long before that point because I realize it should not
> be taking this long. The particular dataset we're doing is not for
> real-time exploration. These are very large joins we're doing for jobs that
> we run a few times a day.
>
> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xp...@gmail.com>
> wrote:
>
>>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>>
>> -- I have had the same experiences, although not to this extreme (the
>> spills were < 10T while the input was ~ 100s gb) and haven't found any
>> solution yet. I don't believe this is related to input data format. in my
>> case, I got my input data by loading from Hive tables.
>>
>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <26...@qq.com> wrote:
>>
>>> Hi,Corey:
>>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>>> your input files lzo format, and you use sc.text() ? If memory is not
>>> enough, spark will spill 3-4x of input data to disk.
>>>
>>>
>>> ------------------ 原始邮件 ------------------
>>> *发件人:* "Corey Nolet";<cj...@gmail.com>;
>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>>> *收件人:* "Igor Berman"<ig...@gmail.com>;
>>> *抄送:* "user"<us...@spark.apache.org>;
>>> *主题:* Re: Shuffle memory woes
>>>
>>> As for the second part of your questions- we have a fairly complex join
>>> process which requires a ton of stage orchestration from our driver. I've
>>> written some code to be able to walk down our DAG tree and execute siblings
>>> in the tree concurrently where possible (forcing cache to disk on children
>>> that that have multiple chiildren themselves so that they can be run
>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>>> keeping tasks as busy as possible processing concurrent stages. Funny
>>> enough though, the stage that is causing problems with shuffling for us has
>>> a lot of children and doesn't even run concurrently with any other stages
>>> so I ruled out the concurrency of the stages as a culprit for the
>>> shuffliing problem we're seeing.
>>>
>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cj...@gmail.com> wrote:
>>>
>>>> Igor,
>>>>
>>>> I don't think the question is "why can't it fit stuff in memory". I
>>>> know why it can't fit stuff in memory- because it's a large dataset that
>>>> needs to have a reduceByKey() run on it. My understanding is that when it
>>>> doesn't fit into memory it needs to spill in order to consolidate
>>>> intermediary files into a single file. The more data you need to run
>>>> through this, the more it will need to spill. My findings is that once it
>>>> gets stuck in this spill chain with our dataset it's all over @ that point
>>>> because it will spill and spill and spill and spill and spill. If I give
>>>> the shuffle enough memory it won't- irrespective of the number of
>>>> partitions we have (i've done everything from repartition(500) to
>>>> repartition(2500)). It's not a matter of running out of memory on a single
>>>> node because the data is skewed. It's more a matter of the shuffle buffer
>>>> filling up and needing to spill. I think what may be happening is that it
>>>> gets to a point where it's spending more time reading/writing from disk
>>>> while doing the spills then it is actually processing any data. I can tell
>>>> this because I can see that the spills sometimes get up into the 10's to
>>>> 100's of TB where the input data was maybe acquireExecutionMemory at
>>>> most. Unfortunately my code is on a private internal network and I'm
>>>> not able to share it.
>>>>
>>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <ig...@gmail.com>
>>>> wrote:
>>>>
>>>>> so can you provide code snippets: especially it's interesting to see
>>>>> what are your transformation chain, how many partitions are there on each
>>>>> side of shuffle operation
>>>>>
>>>>> the question is why it can't fit stuff in memory when you are
>>>>> shuffling - maybe your partitioner on "reduce" side is not configured
>>>>> properly? I mean if map side is ok, and you just reducing by key or
>>>>> something it should be ok, so some detail is missing...skewed data?
>>>>> aggregate by key?
>>>>>
>>>>> On 6 February 2016 at 20:13, Corey Nolet <cj...@gmail.com> wrote:
>>>>>
>>>>>> Igor,
>>>>>>
>>>>>> Thank you for the response but unfortunately, the problem I'm
>>>>>> referring to goes beyond this. I have set the shuffle memory fraction to be
>>>>>> 90% and set the cache memory to be 0. Repartitioning the RDD helped a tad
>>>>>> on the map side but didn't do much for the spilling when there was no
>>>>>> longer any memory left for the shuffle. Also the new auto-memory management
>>>>>> doesn't seem like it'll have too much of an effect after i've already given
>>>>>> most the memory i've allocated to the shuffle. The problem I'm having is
>>>>>> most specifically related to the shuffle performing declining by several
>>>>>> orders of magnitude when it needs to spill multiple times (it ends up
>>>>>> spilling several hundred for me when it can't fit stuff into memory).
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <ig...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> usually you can solve this by 2 steps
>>>>>>> make rdd to have more partitions
>>>>>>> play with shuffle memory fraction
>>>>>>>
>>>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>>>> automatically
>>>>>>>
>>>>>>> On 5 February 2016 at 23:07, Corey Nolet <cj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I just recently had a discovery that my jobs were taking several
>>>>>>>> hours to completely because of excess shuffle spills. What I found was that
>>>>>>>> when I hit the high point where I didn't have enough memory for the
>>>>>>>> shuffles to store all of their file consolidations at once, it could spill
>>>>>>>> so many times that it causes my job's runtime to increase by orders of
>>>>>>>> magnitude (and sometimes fail altogether).
>>>>>>>>
>>>>>>>> I've played with all the tuning parameters I can find. To speed the
>>>>>>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>>>>>>> shuffle buffering a tad (both up and down).
>>>>>>>>
>>>>>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>>>>>> space with reducers and the shuffles need enough memory to consolidate and
>>>>>>>> pull otherwise they will need to spill and spill and spill. What i've
>>>>>>>> noticed about my jobs is that this is a difference between them taking 30
>>>>>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>>>>>
>>>>>>>> I've found that, as a result of the spilling, I'm better off not
>>>>>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>>>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>>>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>>>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>>>>>> could take orders of magnitude longer to execute.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Shuffle memory woes

Posted by Corey Nolet <cj...@gmail.com>.
Charles,

Thank you for chiming in and I'm glad someone else is experiencing this too
and not just me. I know very well how the Spark shuffles work and I've done
deep dive presentations @ Spark meetups in the past. This problem is
somethng that goes beyond that and, I believe, it exposes a fundamental
paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
it can be fixed.

Also- in regards to how much data actually gets shuffled- believe it or not
this problem can take a 30-40 minute job and make it run for 4 or more
hours. If  let the job run for 4+ hours the amount of data being shuffled
for this particular dataset will be 100 or more TB. Usually, however, I end
up killing the job long before that point because I realize it should not
be taking this long. The particular dataset we're doing is not for
real-time exploration. These are very large joins we're doing for jobs that
we run a few times a day.

On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xp...@gmail.com> wrote:

>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>
> -- I have had the same experiences, although not to this extreme (the
> spills were < 10T while the input was ~ 100s gb) and haven't found any
> solution yet. I don't believe this is related to input data format. in my
> case, I got my input data by loading from Hive tables.
>
> On Sun, Feb 7, 2016 at 6:28 AM, Sea <26...@qq.com> wrote:
>
>> Hi,Corey:
>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>> your input files lzo format, and you use sc.text() ? If memory is not
>> enough, spark will spill 3-4x of input data to disk.
>>
>>
>> ------------------ 原始邮件 ------------------
>> *发件人:* "Corey Nolet";<cj...@gmail.com>;
>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>> *收件人:* "Igor Berman"<ig...@gmail.com>;
>> *抄送:* "user"<us...@spark.apache.org>;
>> *主题:* Re: Shuffle memory woes
>>
>> As for the second part of your questions- we have a fairly complex join
>> process which requires a ton of stage orchestration from our driver. I've
>> written some code to be able to walk down our DAG tree and execute siblings
>> in the tree concurrently where possible (forcing cache to disk on children
>> that that have multiple chiildren themselves so that they can be run
>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>> keeping tasks as busy as possible processing concurrent stages. Funny
>> enough though, the stage that is causing problems with shuffling for us has
>> a lot of children and doesn't even run concurrently with any other stages
>> so I ruled out the concurrency of the stages as a culprit for the
>> shuffliing problem we're seeing.
>>
>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cj...@gmail.com> wrote:
>>
>>> Igor,
>>>
>>> I don't think the question is "why can't it fit stuff in memory". I know
>>> why it can't fit stuff in memory- because it's a large dataset that needs
>>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>>> fit into memory it needs to spill in order to consolidate intermediary
>>> files into a single file. The more data you need to run through this, the
>>> more it will need to spill. My findings is that once it gets stuck in this
>>> spill chain with our dataset it's all over @ that point because it will
>>> spill and spill and spill and spill and spill. If I give the shuffle enough
>>> memory it won't- irrespective of the number of partitions we have (i've
>>> done everything from repartition(500) to repartition(2500)). It's not a
>>> matter of running out of memory on a single node because the data is
>>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>>> spill. I think what may be happening is that it gets to a point where it's
>>> spending more time reading/writing from disk while doing the spills then it
>>> is actually processing any data. I can tell this because I can see that the
>>> spills sometimes get up into the 10's to 100's of TB where the input data
>>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>>> private internal network and I'm not able to share it.
>>>
>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <ig...@gmail.com>
>>> wrote:
>>>
>>>> so can you provide code snippets: especially it's interesting to see
>>>> what are your transformation chain, how many partitions are there on each
>>>> side of shuffle operation
>>>>
>>>> the question is why it can't fit stuff in memory when you are shuffling
>>>> - maybe your partitioner on "reduce" side is not configured properly? I
>>>> mean if map side is ok, and you just reducing by key or something it should
>>>> be ok, so some detail is missing...skewed data? aggregate by key?
>>>>
>>>> On 6 February 2016 at 20:13, Corey Nolet <cj...@gmail.com> wrote:
>>>>
>>>>> Igor,
>>>>>
>>>>> Thank you for the response but unfortunately, the problem I'm
>>>>> referring to goes beyond this. I have set the shuffle memory fraction to be
>>>>> 90% and set the cache memory to be 0. Repartitioning the RDD helped a tad
>>>>> on the map side but didn't do much for the spilling when there was no
>>>>> longer any memory left for the shuffle. Also the new auto-memory management
>>>>> doesn't seem like it'll have too much of an effect after i've already given
>>>>> most the memory i've allocated to the shuffle. The problem I'm having is
>>>>> most specifically related to the shuffle performing declining by several
>>>>> orders of magnitude when it needs to spill multiple times (it ends up
>>>>> spilling several hundred for me when it can't fit stuff into memory).
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <ig...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> usually you can solve this by 2 steps
>>>>>> make rdd to have more partitions
>>>>>> play with shuffle memory fraction
>>>>>>
>>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>>> automatically
>>>>>>
>>>>>> On 5 February 2016 at 23:07, Corey Nolet <cj...@gmail.com> wrote:
>>>>>>
>>>>>>> I just recently had a discovery that my jobs were taking several
>>>>>>> hours to completely because of excess shuffle spills. What I found was that
>>>>>>> when I hit the high point where I didn't have enough memory for the
>>>>>>> shuffles to store all of their file consolidations at once, it could spill
>>>>>>> so many times that it causes my job's runtime to increase by orders of
>>>>>>> magnitude (and sometimes fail altogether).
>>>>>>>
>>>>>>> I've played with all the tuning parameters I can find. To speed the
>>>>>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>>>>>> shuffle buffering a tad (both up and down).
>>>>>>>
>>>>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>>>>> space with reducers and the shuffles need enough memory to consolidate and
>>>>>>> pull otherwise they will need to spill and spill and spill. What i've
>>>>>>> noticed about my jobs is that this is a difference between them taking 30
>>>>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>>>>
>>>>>>> I've found that, as a result of the spilling, I'm better off not
>>>>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>>>>> could take orders of magnitude longer to execute.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Shuffle memory woes

Posted by Charles Chao <xp...@gmail.com>.
 "The dataset is 100gb at most, the spills can up to 10T-100T"

-- I have had the same experiences, although not to this extreme (the
spills were < 10T while the input was ~ 100s gb) and haven't found any
solution yet. I don't believe this is related to input data format. in my
case, I got my input data by loading from Hive tables.

On Sun, Feb 7, 2016 at 6:28 AM, Sea <26...@qq.com> wrote:

> Hi,Corey:
>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are your
> input files lzo format, and you use sc.text() ? If memory is not enough,
> spark will spill 3-4x of input data to disk.
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Corey Nolet";<cj...@gmail.com>;
> *发送时间:* 2016年2月7日(星期天) 晚上8:56
> *收件人:* "Igor Berman"<ig...@gmail.com>;
> *抄送:* "user"<us...@spark.apache.org>;
> *主题:* Re: Shuffle memory woes
>
> As for the second part of your questions- we have a fairly complex join
> process which requires a ton of stage orchestration from our driver. I've
> written some code to be able to walk down our DAG tree and execute siblings
> in the tree concurrently where possible (forcing cache to disk on children
> that that have multiple chiildren themselves so that they can be run
> concurrently). Ultimatey, we have seen significant speedup in our jobs by
> keeping tasks as busy as possible processing concurrent stages. Funny
> enough though, the stage that is causing problems with shuffling for us has
> a lot of children and doesn't even run concurrently with any other stages
> so I ruled out the concurrency of the stages as a culprit for the
> shuffliing problem we're seeing.
>
> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cj...@gmail.com> wrote:
>
>> Igor,
>>
>> I don't think the question is "why can't it fit stuff in memory". I know
>> why it can't fit stuff in memory- because it's a large dataset that needs
>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>> fit into memory it needs to spill in order to consolidate intermediary
>> files into a single file. The more data you need to run through this, the
>> more it will need to spill. My findings is that once it gets stuck in this
>> spill chain with our dataset it's all over @ that point because it will
>> spill and spill and spill and spill and spill. If I give the shuffle enough
>> memory it won't- irrespective of the number of partitions we have (i've
>> done everything from repartition(500) to repartition(2500)). It's not a
>> matter of running out of memory on a single node because the data is
>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>> spill. I think what may be happening is that it gets to a point where it's
>> spending more time reading/writing from disk while doing the spills then it
>> is actually processing any data. I can tell this because I can see that the
>> spills sometimes get up into the 10's to 100's of TB where the input data
>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>> private internal network and I'm not able to share it.
>>
>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <ig...@gmail.com>
>> wrote:
>>
>>> so can you provide code snippets: especially it's interesting to see
>>> what are your transformation chain, how many partitions are there on each
>>> side of shuffle operation
>>>
>>> the question is why it can't fit stuff in memory when you are shuffling
>>> - maybe your partitioner on "reduce" side is not configured properly? I
>>> mean if map side is ok, and you just reducing by key or something it should
>>> be ok, so some detail is missing...skewed data? aggregate by key?
>>>
>>> On 6 February 2016 at 20:13, Corey Nolet <cj...@gmail.com> wrote:
>>>
>>>> Igor,
>>>>
>>>> Thank you for the response but unfortunately, the problem I'm referring
>>>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>>>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>>>> map side but didn't do much for the spilling when there was no longer any
>>>> memory left for the shuffle. Also the new auto-memory management doesn't
>>>> seem like it'll have too much of an effect after i've already given most
>>>> the memory i've allocated to the shuffle. The problem I'm having is most
>>>> specifically related to the shuffle performing declining by several orders
>>>> of magnitude when it needs to spill multiple times (it ends up spilling
>>>> several hundred for me when it can't fit stuff into memory).
>>>>
>>>>
>>>>
>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <ig...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> usually you can solve this by 2 steps
>>>>> make rdd to have more partitions
>>>>> play with shuffle memory fraction
>>>>>
>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>> automatically
>>>>>
>>>>> On 5 February 2016 at 23:07, Corey Nolet <cj...@gmail.com> wrote:
>>>>>
>>>>>> I just recently had a discovery that my jobs were taking several
>>>>>> hours to completely because of excess shuffle spills. What I found was that
>>>>>> when I hit the high point where I didn't have enough memory for the
>>>>>> shuffles to store all of their file consolidations at once, it could spill
>>>>>> so many times that it causes my job's runtime to increase by orders of
>>>>>> magnitude (and sometimes fail altogether).
>>>>>>
>>>>>> I've played with all the tuning parameters I can find. To speed the
>>>>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>>>>> shuffle buffering a tad (both up and down).
>>>>>>
>>>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>>>> space with reducers and the shuffles need enough memory to consolidate and
>>>>>> pull otherwise they will need to spill and spill and spill. What i've
>>>>>> noticed about my jobs is that this is a difference between them taking 30
>>>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>>>
>>>>>> I've found that, as a result of the spilling, I'm better off not
>>>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>>>> could take orders of magnitude longer to execute.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>