You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Roch Denis <rd...@exostatic.com> on 2014/07/18 07:14:04 UTC

Last step of processing is using too much memory.

Hello,

I have an issue where my spark code is using too much memory in the final
step ( a count for testing purpose, it will write the result to a db when it
works ). I'm really not too sure how I can break down the last step to use
less RAM.

So, basically my data is log lines and each log line has a session id. I
want to group by session to reconstruct the events of a session for BI
purposes.

So my steps are:

-Load the loglines
-Do a map to create a K,V for each log line
-Do a groupByKey.
-Do a final map on the log lines to rebuild my session.
-Do a count to trigger everything.

That did not work at all, I let it run for 35 minutes and all it was doing
was disk read/write and all the cpu were blocked on IO wait and I have 1%
free Mem.

So, I thought that I could help by reading my log lines in chunks of 1 200
000 lines and THEN doing a groupByKey on that subset. After everything was
done, I would just combine all my rdd with "+" and do a final groupByKey
pass. The result is still the same, heavy disk swapping, 1% memory left and
all the CPU are doing io wait.

It looks like:
-Load subset
-Do a map to create a K,V for each log line
-Do a groupByKey.
-Add all the subset rdd together.
-Do a final groupByKey.
-Do a count.

I can post the code if it would help but there's a lot of code confusing the
issue that's used to extract the logs from mongodb with a flatmap.


This is the memory usage of each process, it's an issue because I have 12GB
of RAM on that machine:
     VIRT    RES    SHR S  %CPU     TIME+ COMMAND
 3378712 2.646g    700 D   0.3   0:21.30 python
 3377568 2.566g    700 D   0.0   0:20.80 python
 3374984 2.485g    700 D   0.0   0:20.29 python
 3375588 2.449g    700 D   0.3   0:20.62 python
 3495560 206908   3920 S   1.3   0:45.36 java

If I look at the swap space with "free", same thing, there's no memory left
to swap out from buffer/cache

              total       used       free     shared    buffers     cached
Mem:      12305524   12159320     146204         20       1072      29036
-/+ buffers/cache:   12129212     176312
Swap:      5857276    3885296    1971980


In the screenshot below, you can see the step where it's stuck at. The
substep are groups of 4 because I break down each sub chunk into blocks of
4.
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n10134/issue.png> 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Last step of processing is using too much memory.

Posted by Davies Liu <da...@databricks.com>.
When you do groupBy(), it wish to load all the data into memory for best
performance, then you should specify the number of partitions carefully.

In Spark master or upcoming 1.1 release, PySpark can do external groupBy(),
it means that it will dumps the data into disks if there is not enough memory
to hold all the data. It also will help in this case.

On Fri, Jul 18, 2014 at 1:56 AM, Roch Denis <rd...@exostatic.com> wrote:
> Well, for what it's worth, I found the issue after spending the whole night
> running experiments;).
>
> Basically, I needed to give a higher number of partition for the groupByKey.
> I was simply using the default, which generated only 4 partitions and so the
> whole thing blew up.
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134p10147.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Last step of processing is using too much memory.

Posted by Roch Denis <rd...@exostatic.com>.
Well, for what it's worth, I found the issue after spending the whole night
running experiments;).

Basically, I needed to give a higher number of partition for the groupByKey.
I was simply using the default, which generated only 4 partitions and so the
whole thing blew up.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134p10147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.