You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kevin Burton <bu...@spinn3r.com> on 2016/09/11 01:04:18 UTC

Selecting the top 100 records per group by?

I'm trying to figure out a way to group by and return the top 100 records
in that group.

Something like:

SELECT TOP(100, user_id) FROM posts GROUP BY user_id;

But I can't really figure out the best way to do this...

There is a FIRST and LAST aggregate function but this only returns one
column.

I could do something like:

SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT
100;

But that limit is applied for ALL the records. Not each individual user.

The only other thing I can think of is to do a manual map reduce and then
have the reducer only return the top 100 each time...

Would LOVE some advice here...

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Re: Selecting the top 100 records per group by?

Posted by Mich Talebzadeh <mi...@gmail.com>.
You can of course do this using FP.

val wSpec = Window.partitionBy('price).orderBy(desc("price"))
df2.filter('security > "
").select(dense_rank().over(wSpec).as("rank"),'TIMECREATED, 'SECURITY,
substring('PRICE,1,7)).filter('rank<=10).show


HTH


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 11 September 2016 at 07:15, Mich Talebzadeh <mi...@gmail.com>
wrote:

> DENSE_RANK will give you ordering and sequence within a particular column.
> This is Hive
>
>  var sqltext = """
>      | SELECT RANK, timecreated,security, price
>      |      FROM (
>      |            SELECT timecreated,security, price,
>      |       DENSE_RANK() OVER (ORDER BY price DESC ) AS RANK
>      |      FROM test.prices
>      |           ) tmp
>      |      WHERE rank <= 10
>      | """
> sql(sqltext).collect.foreach(println)
>
> [1,2016-09-09 16:55:44,Esso,99.995]
> [1,2016-09-09 21:22:52,AVIVA,99.995]
> [1,2016-09-09 21:22:52,Barclays,99.995]
> [1,2016-09-09 21:24:28,JPM,99.995]
> [1,2016-09-09 21:30:38,Microsoft,99.995]
> [1,2016-09-09 21:31:12,UNILEVER,99.995]
> [2,2016-09-09 16:54:14,BP,99.99]
> [2,2016-09-09 16:54:36,Tate & Lyle,99.99]
> [2,2016-09-09 16:56:28,EASYJET,99.99]
> [2,2016-09-09 16:59:28,IBM,99.99]
> [2,2016-09-09 20:16:10,EXPERIAN,99.99]
> [2,2016-09-09 22:25:20,Microsoft,99.99]
> [2,2016-09-09 22:53:49,Tate & Lyle,99.99]
> [3,2016-09-09 15:31:06,UNILEVER,99.985]
>
> HTH
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 11 September 2016 at 04:32, Kevin Burton <bu...@spinn3r.com> wrote:
>
>> Looks like you can do it with dense_rank functions.
>>
>> https://databricks.com/blog/2015/07/15/introducing-window-fu
>> nctions-in-spark-sql.html
>>
>> I setup some basic records and seems like it did the right thing.
>>
>> Now time to throw 50TB and 100 spark nodes at this problem and see what
>> happens :)
>>
>> On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton <bu...@spinn3r.com> wrote:
>>
>>> Ah.. might actually. I'll have to mess around with that.
>>>
>>> On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley <km...@gmail.com> wrote:
>>>
>>>> Would `topByKey` help?
>>>>
>>>> https://github.com/apache/spark/blob/master/mllib/src/main/s
>>>> cala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>>>>
>>>> Best,
>>>> Karl
>>>>
>>>> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton <bu...@spinn3r.com>
>>>> wrote:
>>>>
>>>>> I'm trying to figure out a way to group by and return the top 100
>>>>> records in that group.
>>>>>
>>>>> Something like:
>>>>>
>>>>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>>>>
>>>>> But I can't really figure out the best way to do this...
>>>>>
>>>>> There is a FIRST and LAST aggregate function but this only returns one
>>>>> column.
>>>>>
>>>>> I could do something like:
>>>>>
>>>>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>>>>> LIMIT 100;
>>>>>
>>>>> But that limit is applied for ALL the records. Not each individual
>>>>> user.
>>>>>
>>>>> The only other thing I can think of is to do a manual map reduce and
>>>>> then have the reducer only return the top 100 each time...
>>>>>
>>>>> Would LOVE some advice here...
>>>>>
>>>>> --
>>>>>
>>>>> We’re hiring if you know of any awesome Java Devops or Linux
>>>>> Operations Engineers!
>>>>>
>>>>> Founder/CEO Spinn3r.com
>>>>> Location: *San Francisco, CA*
>>>>> blog: http://burtonator.wordpress.com
>>>>> … or check out my Google+ profile
>>>>> <https://plus.google.com/102718274791889610666/posts>
>>>>>
>>>>>
>>>
>>>
>>> --
>>>
>>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>>> Engineers!
>>>
>>> Founder/CEO Spinn3r.com
>>> Location: *San Francisco, CA*
>>> blog: http://burtonator.wordpress.com
>>> … or check out my Google+ profile
>>> <https://plus.google.com/102718274791889610666/posts>
>>>
>>>
>>
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> <https://plus.google.com/102718274791889610666/posts>
>>
>>
>

Re: Selecting the top 100 records per group by?

Posted by Mich Talebzadeh <mi...@gmail.com>.
DENSE_RANK will give you ordering and sequence within a particular column.
This is Hive

 var sqltext = """
     | SELECT RANK, timecreated,security, price
     |      FROM (
     |            SELECT timecreated,security, price,
     |       DENSE_RANK() OVER (ORDER BY price DESC ) AS RANK
     |      FROM test.prices
     |           ) tmp
     |      WHERE rank <= 10
     | """
sql(sqltext).collect.foreach(println)

[1,2016-09-09 16:55:44,Esso,99.995]
[1,2016-09-09 21:22:52,AVIVA,99.995]
[1,2016-09-09 21:22:52,Barclays,99.995]
[1,2016-09-09 21:24:28,JPM,99.995]
[1,2016-09-09 21:30:38,Microsoft,99.995]
[1,2016-09-09 21:31:12,UNILEVER,99.995]
[2,2016-09-09 16:54:14,BP,99.99]
[2,2016-09-09 16:54:36,Tate & Lyle,99.99]
[2,2016-09-09 16:56:28,EASYJET,99.99]
[2,2016-09-09 16:59:28,IBM,99.99]
[2,2016-09-09 20:16:10,EXPERIAN,99.99]
[2,2016-09-09 22:25:20,Microsoft,99.99]
[2,2016-09-09 22:53:49,Tate & Lyle,99.99]
[3,2016-09-09 15:31:06,UNILEVER,99.985]

HTH







Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 11 September 2016 at 04:32, Kevin Burton <bu...@spinn3r.com> wrote:

> Looks like you can do it with dense_rank functions.
>
> https://databricks.com/blog/2015/07/15/introducing-window-
> functions-in-spark-sql.html
>
> I setup some basic records and seems like it did the right thing.
>
> Now time to throw 50TB and 100 spark nodes at this problem and see what
> happens :)
>
> On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton <bu...@spinn3r.com> wrote:
>
>> Ah.. might actually. I'll have to mess around with that.
>>
>> On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley <km...@gmail.com> wrote:
>>
>>> Would `topByKey` help?
>>>
>>> https://github.com/apache/spark/blob/master/mllib/src/main/s
>>> cala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>>>
>>> Best,
>>> Karl
>>>
>>> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton <bu...@spinn3r.com> wrote:
>>>
>>>> I'm trying to figure out a way to group by and return the top 100
>>>> records in that group.
>>>>
>>>> Something like:
>>>>
>>>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>>>
>>>> But I can't really figure out the best way to do this...
>>>>
>>>> There is a FIRST and LAST aggregate function but this only returns one
>>>> column.
>>>>
>>>> I could do something like:
>>>>
>>>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>>>> LIMIT 100;
>>>>
>>>> But that limit is applied for ALL the records. Not each individual
>>>> user.
>>>>
>>>> The only other thing I can think of is to do a manual map reduce and
>>>> then have the reducer only return the top 100 each time...
>>>>
>>>> Would LOVE some advice here...
>>>>
>>>> --
>>>>
>>>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>>>> Engineers!
>>>>
>>>> Founder/CEO Spinn3r.com
>>>> Location: *San Francisco, CA*
>>>> blog: http://burtonator.wordpress.com
>>>> … or check out my Google+ profile
>>>> <https://plus.google.com/102718274791889610666/posts>
>>>>
>>>>
>>
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> <https://plus.google.com/102718274791889610666/posts>
>>
>>
>
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> <https://plus.google.com/102718274791889610666/posts>
>
>

RE: Selecting the top 100 records per group by?

Posted by "Mendelson, Assaf" <As...@rsa.com>.
You can also create a custom aggregation function. It might provide better performance than dense_rank.

Consider the following example to collect everything as list:
class CollectListFunction[T](val colType: DataType) extends UserDefinedAggregateFunction {

  def inputSchema: StructType =
    new StructType().add("inputCol", colType)

  def bufferSchema: StructType =
    new StructType().add("outputCol", ArrayType(colType))

  def dataType: DataType = ArrayType(colType)

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, new mutable.ArrayBuffer[T])
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val list = buffer.getSeq[T](0)
    if (!input.isNullAt(0)) {
      val sales = input.getAs[T](0)
      buffer.update(0, list:+sales)
    }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getSeq[T](0) ++ buffer2.getSeq[T](0))
  }

  def evaluate(buffer: Row): Any = {
    buffer.getSeq[T](0)
  }
}

All you would need to do is modify it to contain only the top 100…

From: burtonator2011@gmail.com<ma...@gmail.com> [mailto:burtonator2011@gmail.com] On Behalf Of Kevin Burton
Sent: Sunday, September 11, 2016 6:33 AM
To: Karl Higley
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Selecting the top 100 records per group by?

Looks like you can do it with dense_rank functions.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

I setup some basic records and seems like it did the right thing.

Now time to throw 50TB and 100 spark nodes at this problem and see what happens :)

On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton <bu...@spinn3r.com>> wrote:
Ah.. might actually. I'll have to mess around with that.

On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley <km...@gmail.com>> wrote:
Would `topByKey` help?

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42

Best,
Karl

On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton <bu...@spinn3r.com>> wrote:
I'm trying to figure out a way to group by and return the top 100 records in that group.

Something like:

SELECT TOP(100, user_id) FROM posts GROUP BY user_id;

But I can't really figure out the best way to do this...

There is a FIRST and LAST aggregate function but this only returns one column.

I could do something like:

SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT 100;

But that limit is applied for ALL the records. Not each individual user.

The only other thing I can think of is to do a manual map reduce and then have the reducer only return the top 100 each time...

Would LOVE some advice here...

--
We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers!

Founder/CEO Spinn3r.com<http://Spinn3r.com>
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ profile<https://plus.google.com/102718274791889610666/posts>




--
We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers!

Founder/CEO Spinn3r.com<http://Spinn3r.com>
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ profile<https://plus.google.com/102718274791889610666/posts>




--
We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers!

Founder/CEO Spinn3r.com<http://Spinn3r.com>
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ profile<https://plus.google.com/102718274791889610666/posts>


Re: Selecting the top 100 records per group by?

Posted by Kevin Burton <bu...@spinn3r.com>.
Looks like you can do it with dense_rank functions.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

I setup some basic records and seems like it did the right thing.

Now time to throw 50TB and 100 spark nodes at this problem and see what
happens :)

On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton <bu...@spinn3r.com> wrote:

> Ah.. might actually. I'll have to mess around with that.
>
> On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley <km...@gmail.com> wrote:
>
>> Would `topByKey` help?
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/
>> scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>>
>> Best,
>> Karl
>>
>> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton <bu...@spinn3r.com> wrote:
>>
>>> I'm trying to figure out a way to group by and return the top 100
>>> records in that group.
>>>
>>> Something like:
>>>
>>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>>
>>> But I can't really figure out the best way to do this...
>>>
>>> There is a FIRST and LAST aggregate function but this only returns one
>>> column.
>>>
>>> I could do something like:
>>>
>>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>>> LIMIT 100;
>>>
>>> But that limit is applied for ALL the records. Not each individual user.
>>>
>>>
>>> The only other thing I can think of is to do a manual map reduce and
>>> then have the reducer only return the top 100 each time...
>>>
>>> Would LOVE some advice here...
>>>
>>> --
>>>
>>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>>> Engineers!
>>>
>>> Founder/CEO Spinn3r.com
>>> Location: *San Francisco, CA*
>>> blog: http://burtonator.wordpress.com
>>> … or check out my Google+ profile
>>> <https://plus.google.com/102718274791889610666/posts>
>>>
>>>
>
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> <https://plus.google.com/102718274791889610666/posts>
>
>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Re: Selecting the top 100 records per group by?

Posted by Kevin Burton <bu...@spinn3r.com>.
Ah.. might actually. I'll have to mess around with that.

On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley <km...@gmail.com> wrote:

> Would `topByKey` help?
>
> https://github.com/apache/spark/blob/master/mllib/src/
> main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>
> Best,
> Karl
>
> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton <bu...@spinn3r.com> wrote:
>
>> I'm trying to figure out a way to group by and return the top 100 records
>> in that group.
>>
>> Something like:
>>
>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>
>> But I can't really figure out the best way to do this...
>>
>> There is a FIRST and LAST aggregate function but this only returns one
>> column.
>>
>> I could do something like:
>>
>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>> LIMIT 100;
>>
>> But that limit is applied for ALL the records. Not each individual user.
>>
>> The only other thing I can think of is to do a manual map reduce and then
>> have the reducer only return the top 100 each time...
>>
>> Would LOVE some advice here...
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> <https://plus.google.com/102718274791889610666/posts>
>>
>>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Re: Selecting the top 100 records per group by?

Posted by Karl Higley <km...@gmail.com>.
Would `topByKey` help?

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42

Best,
Karl

On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton <bu...@spinn3r.com> wrote:

> I'm trying to figure out a way to group by and return the top 100 records
> in that group.
>
> Something like:
>
> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>
> But I can't really figure out the best way to do this...
>
> There is a FIRST and LAST aggregate function but this only returns one
> column.
>
> I could do something like:
>
> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT
> 100;
>
> But that limit is applied for ALL the records. Not each individual user.
>
> The only other thing I can think of is to do a manual map reduce and then
> have the reducer only return the top 100 each time...
>
> Would LOVE some advice here...
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> <https://plus.google.com/102718274791889610666/posts>
>
>