You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mario Pastorelli <ma...@teralytics.ch> on 2015/03/30 11:06:50 UTC

Problem with groupBy and OOM when just writing the group in a file

we are experiencing some problems with the groupBy operations when used 
to group together data that will be written in the same file. The 
operation that we want to do is the following: given some data with a 
timestamp, we want to sort it by timestamp, group it by hour and write 
one file per hour. One could do something like

rdd.groupBy(hour).foreach{ case (hour, group) =>
     val writer = writerForHour(hour)
     group.toSeq.sortBy(hour).foreach(writer.write)
     writer.close()
}

but this will load all the data for one hour in memory and do out of 
memory easily. Originally we though it was a problem with the toSeq that 
was making string the iterable that you obtain as value from the groupBy 
but apparently it is not. We removed the toSeq.sortBy(hour) but we still 
get OOM when the data in a group is huge.
I saw that there have been a discussion on the ML about groupBy that 
must require everything to stay in memory at 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487 
but I found no solution to my problem.

So my questions are the following:

1) is this groupBy problem still in Spark 1.3?
2) why the groupBy requires everything to stay in memory? In my 
ignorance, I was convinced that groupBy was working with lazy Iterators 
instead of a strict Iterable. I think this is how mapPartition works. 
The operation after the groupBy then would decide if the iterator should 
be strict or not. So groupBy.foreach would be lazy and every record got 
by the foreach could be directly passed to the foreach without waiting 
for the others. Is this not possible for some reason?
3) is there an another way to do what I want to do? Keep in mind that I 
can't repartition because the number of partitions is dynamic on the 
number of year/days/hours. One solution could be to work at minutes 
level when there is too much data but we still wants to create one file 
per hour.

Thanks!

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Problem with groupBy and OOM when just writing the group in a file

Posted by Mario Pastorelli <ma...@teralytics.ch>.
I worked, thank you.

On 30.03.2015 11:58, Sean Owen wrote:
> The behavior is the same. I am not sure it's a problem as much as
> design decision. It does not require everything to stay in memory, but
> the values for one key at a time. Have a look at how the preceding
> shuffle works.
>
> Consider repartitionAndSortWithinPartition to *partition* by hour and
> then sort by time. Then you encounter your data for an hour in order
> in an Iterator with mapPartitions.
>
> On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli
> <ma...@teralytics.ch> wrote:
>> we are experiencing some problems with the groupBy operations when used to
>> group together data that will be written in the same file. The operation
>> that we want to do is the following: given some data with a timestamp, we
>> want to sort it by timestamp, group it by hour and write one file per hour.
>> One could do something like
>>
>> rdd.groupBy(hour).foreach{ case (hour, group) =>
>>      val writer = writerForHour(hour)
>>      group.toSeq.sortBy(hour).foreach(writer.write)
>>      writer.close()
>> }
>>
>> but this will load all the data for one hour in memory and do out of memory
>> easily. Originally we though it was a problem with the toSeq that was making
>> string the iterable that you obtain as value from the groupBy but apparently
>> it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the
>> data in a group is huge.
>> I saw that there have been a discussion on the ML about groupBy that must
>> require everything to stay in memory at
>> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
>> but I found no solution to my problem.
>>
>> So my questions are the following:
>>
>> 1) is this groupBy problem still in Spark 1.3?
>> 2) why the groupBy requires everything to stay in memory? In my ignorance, I
>> was convinced that groupBy was working with lazy Iterators instead of a
>> strict Iterable. I think this is how mapPartition works. The operation after
>> the groupBy then would decide if the iterator should be strict or not. So
>> groupBy.foreach would be lazy and every record got by the foreach could be
>> directly passed to the foreach without waiting for the others. Is this not
>> possible for some reason?
>> 3) is there an another way to do what I want to do? Keep in mind that I
>> can't repartition because the number of partitions is dynamic on the number
>> of year/days/hours. One solution could be to work at minutes level when
>> there is too much data but we still wants to create one file per hour.
>>
>> Thanks!
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Problem with groupBy and OOM when just writing the group in a file

Posted by Sean Owen <so...@cloudera.com>.
The behavior is the same. I am not sure it's a problem as much as
design decision. It does not require everything to stay in memory, but
the values for one key at a time. Have a look at how the preceding
shuffle works.

Consider repartitionAndSortWithinPartition to *partition* by hour and
then sort by time. Then you encounter your data for an hour in order
in an Iterator with mapPartitions.

On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli
<ma...@teralytics.ch> wrote:
> we are experiencing some problems with the groupBy operations when used to
> group together data that will be written in the same file. The operation
> that we want to do is the following: given some data with a timestamp, we
> want to sort it by timestamp, group it by hour and write one file per hour.
> One could do something like
>
> rdd.groupBy(hour).foreach{ case (hour, group) =>
>     val writer = writerForHour(hour)
>     group.toSeq.sortBy(hour).foreach(writer.write)
>     writer.close()
> }
>
> but this will load all the data for one hour in memory and do out of memory
> easily. Originally we though it was a problem with the toSeq that was making
> string the iterable that you obtain as value from the groupBy but apparently
> it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the
> data in a group is huge.
> I saw that there have been a discussion on the ML about groupBy that must
> require everything to stay in memory at
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
> but I found no solution to my problem.
>
> So my questions are the following:
>
> 1) is this groupBy problem still in Spark 1.3?
> 2) why the groupBy requires everything to stay in memory? In my ignorance, I
> was convinced that groupBy was working with lazy Iterators instead of a
> strict Iterable. I think this is how mapPartition works. The operation after
> the groupBy then would decide if the iterator should be strict or not. So
> groupBy.foreach would be lazy and every record got by the foreach could be
> directly passed to the foreach without waiting for the others. Is this not
> possible for some reason?
> 3) is there an another way to do what I want to do? Keep in mind that I
> can't repartition because the number of partitions is dynamic on the number
> of year/days/hours. One solution could be to work at minutes level when
> there is too much data but we still wants to create one file per hour.
>
> Thanks!
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org