You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Stephen Haberman <st...@gmail.com> on 2014/02/18 00:17:49 UTC

oome from large map output status

Hey,

I tracked an OOME on our 0.9 standalone master down to the master
making a large byte[] for the output statuses (serializeMapStatuses),
and it getting copied once/executor.

In our case, an RDD had 9,000 partitions, so, 81m shuffle combinations,
with the output status using 1 byte per compressed size, that's ~81m,
which, after gzipping, the byte[] was 49mb.

However, it's sent via an Akka message, so, a) 49mb is over the default
Akka frame size of 10mb (we'd already upped ours) and b) the byte[]
gets copied into a new byte[] for each slave/executor asking for it.
Plus a few more copies seem to have in the Netty/NIO stack.

AFAICT. As we basically ended with 70 of these 50mb byte[]s in RAM,
for a total of 3.5gb.

So, a few things:

1) Obviously we should not have an RDD with 9k partitions. I'll have the
job author fix that and then we should be fine.

2) That said, since this is sensitive to getting large easily (even if
in user error), perhaps a broadcast variable (or something) should be
used instead of sending the raw bytes through Akka itself?

IANAE, so perhaps optimizing our degenerate case is not worth it, but I
thought I would at least share what we ran in to.

Thanks,
Stephen

Re: oome from large map output status

Posted by Mridul Muralidharan <mr...@gmail.com>.
There is nothing wrong with 9k partitions - I actually use much higher :-) [1]
I have not really seen this interesting issue you mentioned - should
investigate more, thanks for the note !


Regards,
Mridul

[1] I do use insanely high frame size anyway - and my workers/master
run with 8g; maybe why I did not see it yet ...

On Tue, Feb 18, 2014 at 4:47 AM, Stephen Haberman
<st...@gmail.com> wrote:
> Hey,
>
> I tracked an OOME on our 0.9 standalone master down to the master
> making a large byte[] for the output statuses (serializeMapStatuses),
> and it getting copied once/executor.
>
> In our case, an RDD had 9,000 partitions, so, 81m shuffle combinations,
> with the output status using 1 byte per compressed size, that's ~81m,
> which, after gzipping, the byte[] was 49mb.
>
> However, it's sent via an Akka message, so, a) 49mb is over the default
> Akka frame size of 10mb (we'd already upped ours) and b) the byte[]
> gets copied into a new byte[] for each slave/executor asking for it.
> Plus a few more copies seem to have in the Netty/NIO stack.
>
> AFAICT. As we basically ended with 70 of these 50mb byte[]s in RAM,
> for a total of 3.5gb.
>
> So, a few things:
>
> 1) Obviously we should not have an RDD with 9k partitions. I'll have the
> job author fix that and then we should be fine.
>
> 2) That said, since this is sensitive to getting large easily (even if
> in user error), perhaps a broadcast variable (or something) should be
> used instead of sending the raw bytes through Akka itself?
>
> IANAE, so perhaps optimizing our degenerate case is not worth it, but I
> thought I would at least share what we ran in to.
>
> Thanks,
> Stephen