You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Christopher Brady <ch...@oracle.com> on 2016/02/12 19:13:14 UTC

coalesce and executor memory

Can anyone help me understand why using coalesce causes my executors to 
crash with out of memory? What happens during coalesce that increases 
memory usage so much?

If I do:
hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile

everything works fine, but if I do:
hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile

my executors crash with out of memory exceptions.

Is there any documentation that explains what causes the increased 
memory requirements with coalesce? It seems to be less of a problem if I 
coalesce into a larger number of partitions, but I'm not sure why this 
is. How would I estimate how much additional memory the coalesce requires?

Thanks.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: coalesce and executor memory

Posted by Koert Kuipers <ko...@tresata.com>.
sorry i meant to say:
and my way to deal with OOMs is almost always simply to increase number of
partitions. maybe there is a better way that i am not aware of.

On Sat, Feb 13, 2016 at 11:38 PM, Koert Kuipers <ko...@tresata.com> wrote:

> thats right, its the reduce operation that makes the in-memory assumption,
> not the map (although i am still suspicious that the map actually streams
> from disk to disk record by record).
>
> in reality though my experience is that is spark can not fit partitions in
> memory it doesnt work well. i get OOMs. and my to OOMs is almost always
> simply to increase number of partitions. maybe there is a better way that i
> am not aware of.
>
> On Sat, Feb 13, 2016 at 6:32 PM, Daniel Darabos <
> daniel.darabos@lynxanalytics.com> wrote:
>
>>
>> On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> in spark, every partition needs to fit in the memory available to the
>>> core processing it.
>>>
>>
>> That does not agree with my understanding of how it works. I think you
>> could do sc.textFile("input").coalesce(1).map(_.replace("A",
>> "B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
>> (I just tested this with a 3 GB file with a 1 GB executor.)
>>
>> RDDs are mostly implemented using iterators. For example map() operates
>> line-by-line, never pulling in the whole partition into memory. coalesce()
>> also just concatenates the iterators of the parent partitions into a new
>> iterator.
>>
>> Some operations, like reduceByKey(), need to have the whole contents of
>> the partition to work. But they typically use ExternalAppendOnlyMap, so
>> they spill to disk instead of filling up the memory.
>>
>> I know I'm not helping to answer Christopher's issue. Christopher, can
>> you perhaps give us an example that we can easily try in spark-shell to see
>> the same problem?
>>
>
>

Re: coalesce and executor memory

Posted by Koert Kuipers <ko...@tresata.com>.
thats right, its the reduce operation that makes the in-memory assumption,
not the map (although i am still suspicious that the map actually streams
from disk to disk record by record).

in reality though my experience is that is spark can not fit partitions in
memory it doesnt work well. i get OOMs. and my to OOMs is almost always
simply to increase number of partitions. maybe there is a better way that i
am not aware of.

On Sat, Feb 13, 2016 at 6:32 PM, Daniel Darabos <
daniel.darabos@lynxanalytics.com> wrote:

>
> On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> in spark, every partition needs to fit in the memory available to the
>> core processing it.
>>
>
> That does not agree with my understanding of how it works. I think you
> could do sc.textFile("input").coalesce(1).map(_.replace("A",
> "B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
> (I just tested this with a 3 GB file with a 1 GB executor.)
>
> RDDs are mostly implemented using iterators. For example map() operates
> line-by-line, never pulling in the whole partition into memory. coalesce()
> also just concatenates the iterators of the parent partitions into a new
> iterator.
>
> Some operations, like reduceByKey(), need to have the whole contents of
> the partition to work. But they typically use ExternalAppendOnlyMap, so
> they spill to disk instead of filling up the memory.
>
> I know I'm not helping to answer Christopher's issue. Christopher, can you
> perhaps give us an example that we can easily try in spark-shell to see the
> same problem?
>

Re: coalesce and executor memory

Posted by Daniel Darabos <da...@lynxanalytics.com>.
On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers <ko...@tresata.com> wrote:

> in spark, every partition needs to fit in the memory available to the core
> processing it.
>

That does not agree with my understanding of how it works. I think you
could do sc.textFile("input").coalesce(1).map(_.replace("A",
"B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
(I just tested this with a 3 GB file with a 1 GB executor.)

RDDs are mostly implemented using iterators. For example map() operates
line-by-line, never pulling in the whole partition into memory. coalesce()
also just concatenates the iterators of the parent partitions into a new
iterator.

Some operations, like reduceByKey(), need to have the whole contents of the
partition to work. But they typically use ExternalAppendOnlyMap, so they
spill to disk instead of filling up the memory.

I know I'm not helping to answer Christopher's issue. Christopher, can you
perhaps give us an example that we can easily try in spark-shell to see the
same problem?

Re: coalesce and executor memory

Posted by Daniel Darabos <da...@lynxanalytics.com>.
On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers <ko...@tresata.com> wrote:

> in spark, every partition needs to fit in the memory available to the core
> processing it.
>

That does not agree with my understanding of how it works. I think you
could do sc.textFile("input").coalesce(1).map(_.replace("A",
"B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
(I just tested this with a 3 GB file with a 1 GB executor.)

RDDs are mostly implemented using iterators. For example map() operates
line-by-line, never pulling in the whole partition into memory. coalesce()
also just concatenates the iterators of the parent partitions into a new
iterator.

Some operations, like reduceByKey(), need to have the whole contents of the
partition to work. But they typically use ExternalAppendOnlyMap, so they
spill to disk instead of filling up the memory.

I know I'm not helping to answer Christopher's issue. Christopher, can you
perhaps give us an example that we can easily try in spark-shell to see the
same problem?

RE: coalesce and executor memory

Posted by Silvio Fiorito <si...@granturing.com>.
Actually, rereading your email I see you're caching. But ‘cache’ uses MEMORY_ONLY. Do you see errors about losing partitions as your job is running?

Are you sure you need to cache if you're just saving to disk? Can you try the coalesce without cache?


From: Christopher Brady<ma...@oracle.com>
Sent: Friday, February 12, 2016 8:34 PM
To: Koert Kuipers<ma...@tresata.com>; Silvio Fiorito<ma...@granturing.com>
Cc: user<ma...@spark.apache.org>
Subject: Re: coalesce and executor memory

Thank you for the responses. The map function just changes the format of the record slightly, so I don't think that would be the cause of the memory problem.

So if I have 3 cores per executor, I need to be able to fit 3 partitions per executor within whatever I specify for the executor memory? Is there a way I can programmatically find a number of partitions I can coalesce down to without running out of memory? Is there some documentation where this is explained?


On 02/12/2016 05:10 PM, Koert Kuipers wrote:
in spark, every partition needs to fit in the memory available to the core processing it.

as you coalesce you reduce number of partitions, increasing partition size. at some point the partition no longer fits in memory.

On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito <si...@granturing.com>> wrote:
Coalesce essentially reduces parallelism, so fewer cores are getting more records. Be aware that it could also lead to loss of data locality, depending on how far you reduce. Depending on what you’re doing in the map operation, it could lead to OOM errors. Can you give more details as to what the code for the map looks like?




On 2/12/16, 1:13 PM, "Christopher Brady" <<m...@oracle.com>> wrote:

>Can anyone help me understand why using coalesce causes my executors to
>crash with out of memory? What happens during coalesce that increases
>memory usage so much?
>
>If I do:
>hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>
>everything works fine, but if I do:
>hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile
>
>my executors crash with out of memory exceptions.
>
>Is there any documentation that explains what causes the increased
>memory requirements with coalesce? It seems to be less of a problem if I
>coalesce into a larger number of partitions, but I'm not sure why this
>is. How would I estimate how much additional memory the coalesce requires?
>
>Thanks.
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
>For additional commands, e-mail: <ma...@spark.apache.org> user-help@spark.apache.org<ma...@spark.apache.org>
>



Re: coalesce and executor memory

Posted by Christopher Brady <ch...@oracle.com>.
Thank you for the responses. The map function just changes the format of 
the record slightly, so I don't think that would be the cause of the 
memory problem.

So if I have 3 cores per executor, I need to be able to fit 3 partitions 
per executor within whatever I specify for the executor memory? Is there 
a way I can programmatically find a number of partitions I can coalesce 
down to without running out of memory? Is there some documentation where 
this is explained?


On 02/12/2016 05:10 PM, Koert Kuipers wrote:
> in spark, every partition needs to fit in the memory available to the 
> core processing it.
>
> as you coalesce you reduce number of partitions, increasing partition 
> size. at some point the partition no longer fits in memory.
>
> On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito 
> <silvio.fiorito@granturing.com <ma...@granturing.com>> 
> wrote:
>
>     Coalesce essentially reduces parallelism, so fewer cores are
>     getting more records. Be aware that it could also lead to loss of
>     data locality, depending on how far you reduce. Depending on what
>     you’re doing in the map operation, it could lead to OOM errors.
>     Can you give more details as to what the code for the map looks like?
>
>
>
>
>     On 2/12/16, 1:13 PM, "Christopher Brady"
>     <christopher.brady@oracle.com
>     <ma...@oracle.com>> wrote:
>
>     >Can anyone help me understand why using coalesce causes my
>     executors to
>     >crash with out of memory? What happens during coalesce that increases
>     >memory usage so much?
>     >
>     >If I do:
>     >hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>     >
>     >everything works fine, but if I do:
>     >hadoopFile -> sample -> coalesce -> cache -> map ->
>     saveAsNewAPIHadoopFile
>     >
>     >my executors crash with out of memory exceptions.
>     >
>     >Is there any documentation that explains what causes the increased
>     >memory requirements with coalesce? It seems to be less of a
>     problem if I
>     >coalesce into a larger number of partitions, but I'm not sure why
>     this
>     >is. How would I estimate how much additional memory the coalesce
>     requires?
>     >
>     >Thanks.
>     >
>     >---------------------------------------------------------------------
>     >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>     >For additional commands, e-mail: user-help@spark.apache.org
>     <ma...@spark.apache.org>
>     >
>
>


Re: coalesce and executor memory

Posted by Koert Kuipers <ko...@tresata.com>.
in spark, every partition needs to fit in the memory available to the core
processing it.

as you coalesce you reduce number of partitions, increasing partition size.
at some point the partition no longer fits in memory.

On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:

> Coalesce essentially reduces parallelism, so fewer cores are getting more
> records. Be aware that it could also lead to loss of data locality,
> depending on how far you reduce. Depending on what you’re doing in the map
> operation, it could lead to OOM errors. Can you give more details as to
> what the code for the map looks like?
>
>
>
>
> On 2/12/16, 1:13 PM, "Christopher Brady" <ch...@oracle.com>
> wrote:
>
> >Can anyone help me understand why using coalesce causes my executors to
> >crash with out of memory? What happens during coalesce that increases
> >memory usage so much?
> >
> >If I do:
> >hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
> >
> >everything works fine, but if I do:
> >hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile
> >
> >my executors crash with out of memory exceptions.
> >
> >Is there any documentation that explains what causes the increased
> >memory requirements with coalesce? It seems to be less of a problem if I
> >coalesce into a larger number of partitions, but I'm not sure why this
> >is. How would I estimate how much additional memory the coalesce requires?
> >
> >Thanks.
> >
> >---------------------------------------------------------------------
> >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >For additional commands, e-mail: user-help@spark.apache.org
> >
>

Re: coalesce and executor memory

Posted by Silvio Fiorito <si...@granturing.com>.
Coalesce essentially reduces parallelism, so fewer cores are getting more records. Be aware that it could also lead to loss of data locality, depending on how far you reduce. Depending on what you’re doing in the map operation, it could lead to OOM errors. Can you give more details as to what the code for the map looks like?




On 2/12/16, 1:13 PM, "Christopher Brady" <ch...@oracle.com> wrote:

>Can anyone help me understand why using coalesce causes my executors to 
>crash with out of memory? What happens during coalesce that increases 
>memory usage so much?
>
>If I do:
>hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>
>everything works fine, but if I do:
>hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile
>
>my executors crash with out of memory exceptions.
>
>Is there any documentation that explains what causes the increased 
>memory requirements with coalesce? It seems to be less of a problem if I 
>coalesce into a larger number of partitions, but I'm not sure why this 
>is. How would I estimate how much additional memory the coalesce requires?
>
>Thanks.
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>For additional commands, e-mail: user-help@spark.apache.org
>