You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yang <te...@gmail.com> on 2016/10/20 17:53:38 UTC

RDD groupBy() then random sort each group ?

in my application, I group by same training samples by their model_id's
 (the input table contains training samples for 100k different models),
then each group ends up having about 1 million training samples,

then I feed that group of samples to a little Logistic Regression solver
(SGD), but SGD requires the input data to be shuffled randomly (so that
positive and negative samples are evenly distributed), so now I do
something like

my_input_rdd.groupBy(x=>x.model_id).map(x=>
    val (model_id, group_of_rows) = x

     (model_id, group_of_rows.toSeq().shuffle() )

).map(x=> (x._1, train_sgd(x._2))


the issue is that on the 3rd row above, I had to explicitly call toSeq() on
the group_of_rows in order to shuffle, which is an Iterable and not Seq.
now I have to load the entire 1 million rows into memory, and in practice
I've seen my tasks OOM and GC time goes crazy (about 50% of total run
time). I suspect this toSeq() is the reason, since doing a simple count()
on the groupBy() result works fine.

I am planning to shuffle the my_input_rdd first, and then groupBy(), and
not do the toSeq().shuffle(). intuitively the input rdd is already
shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
group SHOULD remain shuffled ???? but overall this remains rather flimsy.

any ideas to do this more reliably?

thanks!

Re: RDD groupBy() then random sort each group ?

Posted by Yang <te...@gmail.com>.
thanks.

exactly this is what I ended up doing finally. though it seemed to work,
there seems to be guarantee that the randomness after the
sortWithinPartitions() would be preserved after I do a further groupBy.



On Fri, Oct 21, 2016 at 3:55 PM, Cheng Lian <li...@databricks.com> wrote:

> I think it would much easier to use DataFrame API to do this by doing
> local sort using randn() as key. For example, in Spark 2.0:
>
> val df = spark.range(100)
> val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))
>
> Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the key
> to group by, then you can get the RDD from shuffled and do the following
> operations you want.
>
> Cheng
>
>
>
> On 10/20/16 10:53 AM, Yang wrote:
>
>> in my application, I group by same training samples by their model_id's
>> (the input table contains training samples for 100k different models), then
>> each group ends up having about 1 million training samples,
>>
>> then I feed that group of samples to a little Logistic Regression solver
>> (SGD), but SGD requires the input data to be shuffled randomly (so that
>> positive and negative samples are evenly distributed), so now I do
>> something like
>>
>> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>>     val (model_id, group_of_rows) = x
>>
>>      (model_id, group_of_rows.toSeq().shuffle() )
>>
>> ).map(x=> (x._1, train_sgd(x._2))
>>
>>
>> the issue is that on the 3rd row above, I had to explicitly call toSeq()
>> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
>> now I have to load the entire 1 million rows into memory, and in practice
>> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
>> time). I suspect this toSeq() is the reason, since doing a simple count()
>> on the groupBy() result works fine.
>>
>> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
>> not do the toSeq().shuffle(). intuitively the input rdd is already
>> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
>> group SHOULD remain shuffled ???? but overall this remains rather flimsy.
>>
>> any ideas to do this more reliably?
>>
>> thanks!
>>
>>
>

Re: RDD groupBy() then random sort each group ?

Posted by Cheng Lian <li...@databricks.com>.
I think it would much easier to use DataFrame API to do this by doing 
local sort using randn() as key. For example, in Spark 2.0:

val df = spark.range(100)
val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))

Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the 
key to group by, then you can get the RDD from shuffled and do the 
following operations you want.

Cheng


On 10/20/16 10:53 AM, Yang wrote:
> in my application, I group by same training samples by their 
> model_id's  (the input table contains training samples for 100k 
> different models), then each group ends up having about 1 million 
> training samples,
>
> then I feed that group of samples to a little Logistic Regression 
> solver (SGD), but SGD requires the input data to be shuffled randomly 
> (so that positive and negative samples are evenly distributed), so now 
> I do something like
>
> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>     val (model_id, group_of_rows) = x
>
>      (model_id, group_of_rows.toSeq().shuffle() )
>
> ).map(x=> (x._1, train_sgd(x._2))
>
>
> the issue is that on the 3rd row above, I had to explicitly call 
> toSeq() on the group_of_rows in order to shuffle, which is an Iterable 
> and not Seq. now I have to load the entire 1 million rows into memory, 
> and in practice I've seen my tasks OOM and GC time goes crazy (about 
> 50% of total run time). I suspect this toSeq() is the reason, since 
> doing a simple count() on the groupBy() result works fine.
>
> I am planning to shuffle the my_input_rdd first, and then groupBy(), 
> and not do the toSeq().shuffle(). intuitively the input rdd is already 
> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in 
> the group SHOULD remain shuffled ???? but overall this remains rather 
> flimsy.
>
> any ideas to do this more reliably?
>
> thanks!
>


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: RDD groupBy() then random sort each group ?

Posted by Yang <te...@gmail.com>.
thanks, this direction seems to be inline with what I want.

what i really want is
groupBy() and then for the rows in each group, get an Iterator, and run
each element from the iterator through a local function (specifically SGD),
right now the DataSet API provides this , but it's literally an Iterator so
I can't "reset" the Iterator, but SGD does need the ability to run multiple
passes on the iterator



On Sat, Oct 22, 2016 at 1:22 PM, Koert Kuipers <ko...@tresata.com> wrote:

> groupBy always materializes the entire group (on disk or in memory) which
> is why you should avoid it for large groups.
>
> The key is to never materialize the grouped and shuffled data.
>
> To see one approach to do this take a look at
> https://github.com/tresata/spark-sorted
>
> It's basically a combination of smart partitioning and secondary sort.
>
> On Oct 20, 2016 1:55 PM, "Yang" <te...@gmail.com> wrote:
>
>> in my application, I group by same training samples by their model_id's
>>  (the input table contains training samples for 100k different models),
>> then each group ends up having about 1 million training samples,
>>
>> then I feed that group of samples to a little Logistic Regression solver
>> (SGD), but SGD requires the input data to be shuffled randomly (so that
>> positive and negative samples are evenly distributed), so now I do
>> something like
>>
>> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>>     val (model_id, group_of_rows) = x
>>
>>      (model_id, group_of_rows.toSeq().shuffle() )
>>
>> ).map(x=> (x._1, train_sgd(x._2))
>>
>>
>> the issue is that on the 3rd row above, I had to explicitly call toSeq()
>> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
>> now I have to load the entire 1 million rows into memory, and in practice
>> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
>> time). I suspect this toSeq() is the reason, since doing a simple count()
>> on the groupBy() result works fine.
>>
>> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
>> not do the toSeq().shuffle(). intuitively the input rdd is already
>> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
>> group SHOULD remain shuffled ???? but overall this remains rather flimsy.
>>
>> any ideas to do this more reliably?
>>
>> thanks!
>>
>>

Re: RDD groupBy() then random sort each group ?

Posted by Koert Kuipers <ko...@tresata.com>.
groupBy always materializes the entire group (on disk or in memory) which
is why you should avoid it for large groups.

The key is to never materialize the grouped and shuffled data.

To see one approach to do this take a look at
https://github.com/tresata/spark-sorted

It's basically a combination of smart partitioning and secondary sort.

On Oct 20, 2016 1:55 PM, "Yang" <te...@gmail.com> wrote:

> in my application, I group by same training samples by their model_id's
>  (the input table contains training samples for 100k different models),
> then each group ends up having about 1 million training samples,
>
> then I feed that group of samples to a little Logistic Regression solver
> (SGD), but SGD requires the input data to be shuffled randomly (so that
> positive and negative samples are evenly distributed), so now I do
> something like
>
> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>     val (model_id, group_of_rows) = x
>
>      (model_id, group_of_rows.toSeq().shuffle() )
>
> ).map(x=> (x._1, train_sgd(x._2))
>
>
> the issue is that on the 3rd row above, I had to explicitly call toSeq()
> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
> now I have to load the entire 1 million rows into memory, and in practice
> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
> time). I suspect this toSeq() is the reason, since doing a simple count()
> on the groupBy() result works fine.
>
> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
> not do the toSeq().shuffle(). intuitively the input rdd is already
> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
> group SHOULD remain shuffled ???? but overall this remains rather flimsy.
>
> any ideas to do this more reliably?
>
> thanks!
>
>