You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Theodore Vasiloudis <th...@gmail.com> on 2014/12/04 15:53:12 UTC

Efficient way to get top K values per key in (key, value) RDD?

Hello everyone,

I was wondering what is the most efficient way for retrieving the top K
values per key in a (key, value) RDD.

The simplest way I can think of is to do a groupByKey, sort the iterables
and then take the top K 
elements for every key.

But reduceByKey is an operation that can be very costly.

This
<http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html>  
thread seems related, where it is recommended to change the key include the 
value we want to sort on, and then perform an aggregate operation.

My use case would be to filter an RDD representing the edges of a graph (
(srcID, dstID), edgeWeight), 
so that we only retain at most top K edges according to weight for each
(srcID, dstID) key.
The graph can have multiple  edges between the same two vertices.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficient way to get top K values per key in (key, value) RDD?

Posted by Xiangrui Meng <me...@gmail.com>.
This is implemented in MLlib:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L41.
-Xiangrui

On Wed, Jun 10, 2015 at 1:53 PM, erisa <er...@gmail.com> wrote:
> Hi,
>
> I am a Spark newbie, and trying to solve the same problem, and have
> implemented the same exact solution that sowen  is suggesting. I am using
> priorityqueues to keep trak of the top 25 sub_categories, per each category,
> and using the combineByKey function to do that.
> However I run into the following exception when I submit the spark job:
>
> ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 17)
> java.lang.UnsupportedOperationException: unsuitable as hash key
>     at
> scala.collection.mutable.PriorityQueue.hashCode(PriorityQueue.scala:226)
>
>
> From the error it looks like spark is trying to use the mutable priority
> queue as a hashkey so the error makes sense, but I don't get why it is doing
> that since the value of the RDD record is a priority queue not the key.
>
> Maybe there is a more straightforward solution to what I want to achieve, so
> any suggestion is appreciated :)
>
> Thanks,
> Erisa
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370p23263.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficient way to get top K values per key in (key, value) RDD?

Posted by erisa <er...@gmail.com>.
Hi,

I am a Spark newbie, and trying to solve the same problem, and have
implemented the same exact solution that sowen  is suggesting. I am using
priorityqueues to keep trak of the top 25 sub_categories, per each category,
and using the combineByKey function to do that. 
However I run into the following exception when I submit the spark job:

ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 17)
java.lang.UnsupportedOperationException: unsuitable as hash key
    at
scala.collection.mutable.PriorityQueue.hashCode(PriorityQueue.scala:226)


>From the error it looks like spark is trying to use the mutable priority
queue as a hashkey so the error makes sense, but I don't get why it is doing
that since the value of the RDD record is a priority queue not the key.

Maybe there is a more straightforward solution to what I want to achieve, so
any suggestion is appreciated :)

Thanks,
Erisa



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370p23263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficient way to get top K values per key in (key, value) RDD?

Posted by Sean Owen <so...@cloudera.com>.
You probably want to use combineByKey, and create an empty min queue
for each key. Merge values into the queue if its size is < K. If >= K,
only merge the value if it exceeds the smallest element; if so add it
and remove the smallest element.

This gives you an RDD of keys mapped to collections of up to K values
each, and should be about as efficient as it gets in general.

On Thu, Dec 4, 2014 at 8:53 AM, Theodore Vasiloudis
<th...@gmail.com> wrote:
> Hello everyone,
>
> I was wondering what is the most efficient way for retrieving the top K
> values per key in a (key, value) RDD.
>
> The simplest way I can think of is to do a groupByKey, sort the iterables
> and then take the top K
> elements for every key.
>
> But reduceByKey is an operation that can be very costly.
>
> This
> <http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html>
> thread seems related, where it is recommended to change the key include the
> value we want to sort on, and then perform an aggregate operation.
>
> My use case would be to filter an RDD representing the edges of a graph (
> (srcID, dstID), edgeWeight),
> so that we only retain at most top K edges according to weight for each
> (srcID, dstID) key.
> The graph can have multiple  edges between the same two vertices.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org