You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Julien Carme <ju...@gmail.com> on 2014/09/13 11:46:33 UTC

ReduceByKey performance optimisation

Hello,

I am facing performance issues with reduceByKey. In know that this topic
has already been covered but I did not really find answers to my question.

I am using reduceByKey to remove entries with identical keys, using, as
reduce function, (a,b) => a. It seems to be a relatively straightforward
use of reduceByKey, but performances on moderately big RDDs (some tens of
millions of line) are very low, far from what you can reach with
mono-server computing packages like R for example.

I have read on other threads on the topic that reduceByKey always entirely
shuffle the whole data. Is that true ? So it means that a custom
partitionning could not help, right? In my case, I could relatively easily
grant that two identical keys would always be on the same partition,
therefore an option could by to use mapPartition and reeimplement reduce
locally, but I would like to know if there are simpler / more elegant
alternatives.

Thanks for your help,

Re: ReduceByKey performance optimisation

Posted by Julien Carme <ju...@gmail.com>.
OK,  mapPartition seems to be the way to go. Thanks for the help!
Le 13 sept. 2014 16:41, "Sean Owen" <so...@cloudera.com> a écrit :

> This is more concise:
>
> x.groupBy(obj.fieldtobekey).values.map(_.head)
>
> ... but I doubt it's faster.
>
> If all objects with the same fieldtobekey are within the same
> partition, then yes I imagine your biggest speedup comes from
> exploiting that. How about ...
>
> x.mapPartitions(_.map(obj => (obj.fieldtobekey, obj)).toMap.values)
>
> This does require that all keys, plus a representative object each,
> fits in memory.
> I bet you can make it faster than this example too.
>
>
> On Sat, Sep 13, 2014 at 1:15 PM, Gary Malouf <ma...@gmail.com>
> wrote:
> > You need something like:
> >
> > val x: RDD[MyAwesomeObject]
> >
> > x.map(obj => obj.fieldtobekey -> obj).reduceByKey { case (l, _) => l }
> >
> > Does that make sense?
> >
> >
> > On Sat, Sep 13, 2014 at 7:28 AM, Julien Carme <ju...@gmail.com>
> > wrote:
> >>
> >> I need to remove objects with duplicate key, but I need the whole
> object.
> >> Object which have the same key are not necessarily equal, though (but I
> can
> >> dump any on the ones that have identical key).
> >>
> >> 2014-09-13 12:50 GMT+02:00 Sean Owen <so...@cloudera.com>:
> >>>
> >>> If you are just looking for distinct keys, .keys.distinct() should be
> >>> much better.
> >>>
> >>> On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme <julien.carme@gmail.com
> >
> >>> wrote:
> >>> > Hello,
> >>> >
> >>> > I am facing performance issues with reduceByKey. In know that this
> >>> > topic has
> >>> > already been covered but I did not really find answers to my
> question.
> >>> >
> >>> > I am using reduceByKey to remove entries with identical keys, using,
> as
> >>> > reduce function, (a,b) => a. It seems to be a relatively
> >>> > straightforward use
> >>> > of reduceByKey, but performances on moderately big RDDs (some tens of
> >>> > millions of line) are very low, far from what you can reach with
> >>> > mono-server
> >>> > computing packages like R for example.
> >>> >
> >>> > I have read on other threads on the topic that reduceByKey always
> >>> > entirely
> >>> > shuffle the whole data. Is that true ? So it means that a custom
> >>> > partitionning could not help, right? In my case, I could relatively
> >>> > easily
> >>> > grant that two identical keys would always be on the same partition,
> >>> > therefore an option could by to use mapPartition and reeimplement
> >>> > reduce
> >>> > locally, but I would like to know if there are simpler / more elegant
> >>> > alternatives.
> >>> >
> >>> > Thanks for your help,
> >>
> >>
> >
>

Re: ReduceByKey performance optimisation

Posted by Sean Owen <so...@cloudera.com>.
This is more concise:

x.groupBy(obj.fieldtobekey).values.map(_.head)

... but I doubt it's faster.

If all objects with the same fieldtobekey are within the same
partition, then yes I imagine your biggest speedup comes from
exploiting that. How about ...

x.mapPartitions(_.map(obj => (obj.fieldtobekey, obj)).toMap.values)

This does require that all keys, plus a representative object each,
fits in memory.
I bet you can make it faster than this example too.


On Sat, Sep 13, 2014 at 1:15 PM, Gary Malouf <ma...@gmail.com> wrote:
> You need something like:
>
> val x: RDD[MyAwesomeObject]
>
> x.map(obj => obj.fieldtobekey -> obj).reduceByKey { case (l, _) => l }
>
> Does that make sense?
>
>
> On Sat, Sep 13, 2014 at 7:28 AM, Julien Carme <ju...@gmail.com>
> wrote:
>>
>> I need to remove objects with duplicate key, but I need the whole object.
>> Object which have the same key are not necessarily equal, though (but I can
>> dump any on the ones that have identical key).
>>
>> 2014-09-13 12:50 GMT+02:00 Sean Owen <so...@cloudera.com>:
>>>
>>> If you are just looking for distinct keys, .keys.distinct() should be
>>> much better.
>>>
>>> On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme <ju...@gmail.com>
>>> wrote:
>>> > Hello,
>>> >
>>> > I am facing performance issues with reduceByKey. In know that this
>>> > topic has
>>> > already been covered but I did not really find answers to my question.
>>> >
>>> > I am using reduceByKey to remove entries with identical keys, using, as
>>> > reduce function, (a,b) => a. It seems to be a relatively
>>> > straightforward use
>>> > of reduceByKey, but performances on moderately big RDDs (some tens of
>>> > millions of line) are very low, far from what you can reach with
>>> > mono-server
>>> > computing packages like R for example.
>>> >
>>> > I have read on other threads on the topic that reduceByKey always
>>> > entirely
>>> > shuffle the whole data. Is that true ? So it means that a custom
>>> > partitionning could not help, right? In my case, I could relatively
>>> > easily
>>> > grant that two identical keys would always be on the same partition,
>>> > therefore an option could by to use mapPartition and reeimplement
>>> > reduce
>>> > locally, but I would like to know if there are simpler / more elegant
>>> > alternatives.
>>> >
>>> > Thanks for your help,
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: ReduceByKey performance optimisation

Posted by Gary Malouf <ma...@gmail.com>.
You need something like:

val x: RDD[MyAwesomeObject]

x.map(obj => obj.fieldtobekey -> obj).reduceByKey { case (l, _) => l }

Does that make sense?


On Sat, Sep 13, 2014 at 7:28 AM, Julien Carme <ju...@gmail.com>
wrote:

> I need to remove objects with duplicate key, but I need the whole object.
> Object which have the same key are not necessarily equal, though (but I can
> dump any on the ones that have identical key).
>
> 2014-09-13 12:50 GMT+02:00 Sean Owen <so...@cloudera.com>:
>
>> If you are just looking for distinct keys, .keys.distinct() should be
>> much better.
>>
>> On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme <ju...@gmail.com>
>> wrote:
>> > Hello,
>> >
>> > I am facing performance issues with reduceByKey. In know that this
>> topic has
>> > already been covered but I did not really find answers to my question.
>> >
>> > I am using reduceByKey to remove entries with identical keys, using, as
>> > reduce function, (a,b) => a. It seems to be a relatively
>> straightforward use
>> > of reduceByKey, but performances on moderately big RDDs (some tens of
>> > millions of line) are very low, far from what you can reach with
>> mono-server
>> > computing packages like R for example.
>> >
>> > I have read on other threads on the topic that reduceByKey always
>> entirely
>> > shuffle the whole data. Is that true ? So it means that a custom
>> > partitionning could not help, right? In my case, I could relatively
>> easily
>> > grant that two identical keys would always be on the same partition,
>> > therefore an option could by to use mapPartition and reeimplement reduce
>> > locally, but I would like to know if there are simpler / more elegant
>> > alternatives.
>> >
>> > Thanks for your help,
>>
>
>

Re: ReduceByKey performance optimisation

Posted by Julien Carme <ju...@gmail.com>.
I need to remove objects with duplicate key, but I need the whole object.
Object which have the same key are not necessarily equal, though (but I can
dump any on the ones that have identical key).

2014-09-13 12:50 GMT+02:00 Sean Owen <so...@cloudera.com>:

> If you are just looking for distinct keys, .keys.distinct() should be
> much better.
>
> On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme <ju...@gmail.com>
> wrote:
> > Hello,
> >
> > I am facing performance issues with reduceByKey. In know that this topic
> has
> > already been covered but I did not really find answers to my question.
> >
> > I am using reduceByKey to remove entries with identical keys, using, as
> > reduce function, (a,b) => a. It seems to be a relatively straightforward
> use
> > of reduceByKey, but performances on moderately big RDDs (some tens of
> > millions of line) are very low, far from what you can reach with
> mono-server
> > computing packages like R for example.
> >
> > I have read on other threads on the topic that reduceByKey always
> entirely
> > shuffle the whole data. Is that true ? So it means that a custom
> > partitionning could not help, right? In my case, I could relatively
> easily
> > grant that two identical keys would always be on the same partition,
> > therefore an option could by to use mapPartition and reeimplement reduce
> > locally, but I would like to know if there are simpler / more elegant
> > alternatives.
> >
> > Thanks for your help,
>

Re: ReduceByKey performance optimisation

Posted by Sean Owen <so...@cloudera.com>.
If you are just looking for distinct keys, .keys.distinct() should be
much better.

On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme <ju...@gmail.com> wrote:
> Hello,
>
> I am facing performance issues with reduceByKey. In know that this topic has
> already been covered but I did not really find answers to my question.
>
> I am using reduceByKey to remove entries with identical keys, using, as
> reduce function, (a,b) => a. It seems to be a relatively straightforward use
> of reduceByKey, but performances on moderately big RDDs (some tens of
> millions of line) are very low, far from what you can reach with mono-server
> computing packages like R for example.
>
> I have read on other threads on the topic that reduceByKey always entirely
> shuffle the whole data. Is that true ? So it means that a custom
> partitionning could not help, right? In my case, I could relatively easily
> grant that two identical keys would always be on the same partition,
> therefore an option could by to use mapPartition and reeimplement reduce
> locally, but I would like to know if there are simpler / more elegant
> alternatives.
>
> Thanks for your help,

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org