You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Koert Kuipers <ko...@tresata.com> on 2014/07/05 19:16:44 UTC

taking top k values of rdd

my initial approach to taking top k values of a rdd was using a
priority-queue monoid. along these lines:

rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
false).reduce(monoid.plus)

this works fine, but looking at the code for reduce it first reduces within
a partition (which doesnt help me) and then sends the results to the driver
where these again get reduced. this means that for every partition the
(potentially very bulky) priorityqueue gets shipped to the driver.

my driver is client side, not inside cluster, and i cannot change this, so
this shipping to driver of all these queues can be expensive.

is there a better way to do this? should i try to a shuffle first to reduce
the partitions to the minimal amount (since number of queues shipped is
equal to number of partitions)?

is was a way to reduce to a single item RDD, so the queues stay inside
cluster and i can retrieve the final result with RDD.first?

Re: taking top k values of rdd

Posted by Koert Kuipers <ko...@tresata.com>.
so i was thinking along these lines, assuming i start with p partitions:
1) create a priority queue of size k per partition
2) repartition to create one partition
3) reduce

i guess the worry is that in step 2 the one partition needs to hold p
priority queues of size k in memory...
the benefit is that the p priority queues do not get send to the driver
(which is not on cluster)


On Sat, Jul 5, 2014 at 1:20 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i guess i could create a single priorityque per partition, then shuffle to
> a new rdd with 1 partition, and then reduce?
>
>
> On Sat, Jul 5, 2014 at 1:16 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> my initial approach to taking top k values of a rdd was using a
>> priority-queue monoid. along these lines:
>>
>> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>> false).reduce(monoid.plus)
>>
>> this works fine, but looking at the code for reduce it first reduces
>> within a partition (which doesnt help me) and then sends the results to the
>> driver where these again get reduced. this means that for every partition
>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>
>> my driver is client side, not inside cluster, and i cannot change this,
>> so this shipping to driver of all these queues can be expensive.
>>
>> is there a better way to do this? should i try to a shuffle first to
>> reduce the partitions to the minimal amount (since number of queues shipped
>> is equal to number of partitions)?
>>
>> is was a way to reduce to a single item RDD, so the queues stay inside
>> cluster and i can retrieve the final result with RDD.first?
>>
>
>

Re: taking top k values of rdd

Posted by Koert Kuipers <ko...@tresata.com>.
i guess i could create a single priorityque per partition, then shuffle to
a new rdd with 1 partition, and then reduce?


On Sat, Jul 5, 2014 at 1:16 PM, Koert Kuipers <ko...@tresata.com> wrote:

> my initial approach to taking top k values of a rdd was using a
> priority-queue monoid. along these lines:
>
> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
> false).reduce(monoid.plus)
>
> this works fine, but looking at the code for reduce it first reduces
> within a partition (which doesnt help me) and then sends the results to the
> driver where these again get reduced. this means that for every partition
> the (potentially very bulky) priorityqueue gets shipped to the driver.
>
> my driver is client side, not inside cluster, and i cannot change this, so
> this shipping to driver of all these queues can be expensive.
>
> is there a better way to do this? should i try to a shuffle first to
> reduce the partitions to the minimal amount (since number of queues shipped
> is equal to number of partitions)?
>
> is was a way to reduce to a single item RDD, so the queues stay inside
> cluster and i can retrieve the final result with RDD.first?
>

Re: taking top k values of rdd

Posted by Nick Pentreath <ni...@gmail.com>.
Right. That is unavoidable unless as you say you repartition into 1 partition, which may do the trick.


When I say send the top k per partition I don't mean send the pq but the actual values. This may end up being relatively small if k and p are not too big. (I'm not sure how large serialized pq is).
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers <ko...@tresata.com> wrote:

> hey nick,
> you are right. i didnt explain myself well and my code example was wrong...
> i am keeping a priority-queue with k items per partition (using
> com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
> of the queues).
> but this still means i am sending k items per partition to my driver, so k
> x p, while i only need k.
> thanks! koert
> On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath <ni...@gmail.com>
> wrote:
>> To make it efficient in your case you may need to do a bit of custom code
>> to emit the top k per partition and then only send those to the driver. On
>> the driver you can just top k the combined top k from each partition
>> (assuming you have (object, count) for each top k list).
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>
>>
>> On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> my initial approach to taking top k values of a rdd was using a
>>> priority-queue monoid. along these lines:
>>>
>>>  rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>>> false).reduce(monoid.plus)
>>>
>>> this works fine, but looking at the code for reduce it first reduces
>>> within a partition (which doesnt help me) and then sends the results to the
>>> driver where these again get reduced. this means that for every partition
>>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>>
>>> my driver is client side, not inside cluster, and i cannot change this,
>>> so this shipping to driver of all these queues can be expensive.
>>>
>>> is there a better way to do this? should i try to a shuffle first to
>>> reduce the partitions to the minimal amount (since number of queues shipped
>>> is equal to number of partitions)?
>>>
>>> is was a way to reduce to a single item RDD, so the queues stay inside
>>> cluster and i can retrieve the final result with RDD.first?
>>>
>>
>>

Re: taking top k values of rdd

Posted by Koert Kuipers <ko...@tresata.com>.
hey nick,
you are right. i didnt explain myself well and my code example was wrong...
i am keeping a priority-queue with k items per partition (using
com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
of the queues).
but this still means i am sending k items per partition to my driver, so k
x p, while i only need k.
thanks! koert



On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath <ni...@gmail.com>
wrote:

> To make it efficient in your case you may need to do a bit of custom code
> to emit the top k per partition and then only send those to the driver. On
> the driver you can just top k the combined top k from each partition
> (assuming you have (object, count) for each top k list).
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
> On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> my initial approach to taking top k values of a rdd was using a
>> priority-queue monoid. along these lines:
>>
>>  rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>> false).reduce(monoid.plus)
>>
>> this works fine, but looking at the code for reduce it first reduces
>> within a partition (which doesnt help me) and then sends the results to the
>> driver where these again get reduced. this means that for every partition
>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>
>> my driver is client side, not inside cluster, and i cannot change this,
>> so this shipping to driver of all these queues can be expensive.
>>
>> is there a better way to do this? should i try to a shuffle first to
>> reduce the partitions to the minimal amount (since number of queues shipped
>> is equal to number of partitions)?
>>
>> is was a way to reduce to a single item RDD, so the queues stay inside
>> cluster and i can retrieve the final result with RDD.first?
>>
>
>

Re: taking top k values of rdd

Posted by Nick Pentreath <ni...@gmail.com>.
To make it efficient in your case you may need to do a bit of custom code to emit the top k per partition and then only send those to the driver. On the driver you can just top k the combined top k from each partition (assuming you have (object, count) for each top k list).

—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers <ko...@tresata.com> wrote:

> my initial approach to taking top k values of a rdd was using a
> priority-queue monoid. along these lines:
> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
> false).reduce(monoid.plus)
> this works fine, but looking at the code for reduce it first reduces within
> a partition (which doesnt help me) and then sends the results to the driver
> where these again get reduced. this means that for every partition the
> (potentially very bulky) priorityqueue gets shipped to the driver.
> my driver is client side, not inside cluster, and i cannot change this, so
> this shipping to driver of all these queues can be expensive.
> is there a better way to do this? should i try to a shuffle first to reduce
> the partitions to the minimal amount (since number of queues shipped is
> equal to number of partitions)?
> is was a way to reduce to a single item RDD, so the queues stay inside
> cluster and i can retrieve the final result with RDD.first?