You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mario Pastorelli <ma...@teralytics.ch> on 2015/03/30 11:06:50 UTC
Problem with groupBy and OOM when just writing the group in a file
we are experiencing some problems with the groupBy operations when used
to group together data that will be written in the same file. The
operation that we want to do is the following: given some data with a
timestamp, we want to sort it by timestamp, group it by hour and write
one file per hour. One could do something like
rdd.groupBy(hour).foreach{ case (hour, group) =>
val writer = writerForHour(hour)
group.toSeq.sortBy(hour).foreach(writer.write)
writer.close()
}
but this will load all the data for one hour in memory and do out of
memory easily. Originally we though it was a problem with the toSeq that
was making string the iterable that you obtain as value from the groupBy
but apparently it is not. We removed the toSeq.sortBy(hour) but we still
get OOM when the data in a group is huge.
I saw that there have been a discussion on the ML about groupBy that
must require everything to stay in memory at
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
but I found no solution to my problem.
So my questions are the following:
1) is this groupBy problem still in Spark 1.3?
2) why the groupBy requires everything to stay in memory? In my
ignorance, I was convinced that groupBy was working with lazy Iterators
instead of a strict Iterable. I think this is how mapPartition works.
The operation after the groupBy then would decide if the iterator should
be strict or not. So groupBy.foreach would be lazy and every record got
by the foreach could be directly passed to the foreach without waiting
for the others. Is this not possible for some reason?
3) is there an another way to do what I want to do? Keep in mind that I
can't repartition because the number of partitions is dynamic on the
number of year/days/hours. One solution could be to work at minutes
level when there is too much data but we still wants to create one file
per hour.
Thanks!
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Problem with groupBy and OOM when just writing the group in a
file
Posted by Mario Pastorelli <ma...@teralytics.ch>.
I worked, thank you.
On 30.03.2015 11:58, Sean Owen wrote:
> The behavior is the same. I am not sure it's a problem as much as
> design decision. It does not require everything to stay in memory, but
> the values for one key at a time. Have a look at how the preceding
> shuffle works.
>
> Consider repartitionAndSortWithinPartition to *partition* by hour and
> then sort by time. Then you encounter your data for an hour in order
> in an Iterator with mapPartitions.
>
> On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli
> <ma...@teralytics.ch> wrote:
>> we are experiencing some problems with the groupBy operations when used to
>> group together data that will be written in the same file. The operation
>> that we want to do is the following: given some data with a timestamp, we
>> want to sort it by timestamp, group it by hour and write one file per hour.
>> One could do something like
>>
>> rdd.groupBy(hour).foreach{ case (hour, group) =>
>> val writer = writerForHour(hour)
>> group.toSeq.sortBy(hour).foreach(writer.write)
>> writer.close()
>> }
>>
>> but this will load all the data for one hour in memory and do out of memory
>> easily. Originally we though it was a problem with the toSeq that was making
>> string the iterable that you obtain as value from the groupBy but apparently
>> it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the
>> data in a group is huge.
>> I saw that there have been a discussion on the ML about groupBy that must
>> require everything to stay in memory at
>> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
>> but I found no solution to my problem.
>>
>> So my questions are the following:
>>
>> 1) is this groupBy problem still in Spark 1.3?
>> 2) why the groupBy requires everything to stay in memory? In my ignorance, I
>> was convinced that groupBy was working with lazy Iterators instead of a
>> strict Iterable. I think this is how mapPartition works. The operation after
>> the groupBy then would decide if the iterator should be strict or not. So
>> groupBy.foreach would be lazy and every record got by the foreach could be
>> directly passed to the foreach without waiting for the others. Is this not
>> possible for some reason?
>> 3) is there an another way to do what I want to do? Keep in mind that I
>> can't repartition because the number of partitions is dynamic on the number
>> of year/days/hours. One solution could be to work at minutes level when
>> there is too much data but we still wants to create one file per hour.
>>
>> Thanks!
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Problem with groupBy and OOM when just writing the group in a file
Posted by Sean Owen <so...@cloudera.com>.
The behavior is the same. I am not sure it's a problem as much as
design decision. It does not require everything to stay in memory, but
the values for one key at a time. Have a look at how the preceding
shuffle works.
Consider repartitionAndSortWithinPartition to *partition* by hour and
then sort by time. Then you encounter your data for an hour in order
in an Iterator with mapPartitions.
On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli
<ma...@teralytics.ch> wrote:
> we are experiencing some problems with the groupBy operations when used to
> group together data that will be written in the same file. The operation
> that we want to do is the following: given some data with a timestamp, we
> want to sort it by timestamp, group it by hour and write one file per hour.
> One could do something like
>
> rdd.groupBy(hour).foreach{ case (hour, group) =>
> val writer = writerForHour(hour)
> group.toSeq.sortBy(hour).foreach(writer.write)
> writer.close()
> }
>
> but this will load all the data for one hour in memory and do out of memory
> easily. Originally we though it was a problem with the toSeq that was making
> string the iterable that you obtain as value from the groupBy but apparently
> it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the
> data in a group is huge.
> I saw that there have been a discussion on the ML about groupBy that must
> require everything to stay in memory at
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
> but I found no solution to my problem.
>
> So my questions are the following:
>
> 1) is this groupBy problem still in Spark 1.3?
> 2) why the groupBy requires everything to stay in memory? In my ignorance, I
> was convinced that groupBy was working with lazy Iterators instead of a
> strict Iterable. I think this is how mapPartition works. The operation after
> the groupBy then would decide if the iterator should be strict or not. So
> groupBy.foreach would be lazy and every record got by the foreach could be
> directly passed to the foreach without waiting for the others. Is this not
> possible for some reason?
> 3) is there an another way to do what I want to do? Keep in mind that I
> can't repartition because the number of partitions is dynamic on the number
> of year/days/hours. One solution could be to work at minutes level when
> there is too much data but we still wants to create one file per hour.
>
> Thanks!
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org