You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lu...@sina.com on 2016/07/07 11:26:07 UTC

回复:Re: how to select first 50 value of each group after group by?

hi Anton:      I check the docs you mentioned, and have code accordingly, however met an exception like "org.apache.spark.sql.AnalysisException: Window function row_number does not take a frame specification.;"      It Seems that the row_number API is giving a global row numbers of every row across all frames, by my understanding. If wrong,please correct me.      I checked all the window function API of http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$, and just found that maybe row_number() seems matches. I am not quit sure about it.
here is my code:    val hc = new org.apache.spark.sql.hive.HiveContext(sc)
    val hivetable = hc.sql("select * from house_sale_pv_location")
    val overLocation = Window.partitionBy(hivetable.col("location")).rowsBetween(0, 49)
    val sortedDF = hivetable.withColumn("rowNumber", row_number().over(overLocation))
    sortedDF.registerTempTable("sortedDF")
    val top50 = hc.sql("select id,location from sortedDF where rowNumber<=50")
    top50.registerTempTable("top50")
    hc.sql("select * from top50 where location=30").collect.foreach(println)

here, hivetable is a DF that I mentioned with 3 columns  "id , pv, location", which is already sorted by pv in desc. So I didn't call orderby in the 3rd line of my code. I just want the first 50 rows, based on  physical location, of each frame.
To Tal:      I tried rank API, however this is not the API I want , because there are some values have same pv are ranked as same values. And first 50 rows of each frame is what I'm expecting. the attached file shows what I got by using rank.       Thank you anyway, I learnt what rank could provide from your advice.

--------------------------------

 

Thanks&amp;Best regards!
San.Luo

----- 原始邮件 -----
发件人:Anton Okolnychyi <an...@gmail.com>
收件人:user <us...@spark.apache.org>
抄送人:luohui20001@sina.com
主题:Re: how to select first 50 value of each group after group by?
日期:2016年07月06日 23点22分

The following resources should be useful:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html

The last link should have the exact solution
2016-07-06 16:55 GMT+02:00 Tal Grynbaum <ta...@gmail.com>:
You can use rank window function to rank each row in the group,  and then filter the rowz with rank < 50

On Wed, Jul 6, 2016, 14:07  <lu...@sina.com> wrote:
hi thereI have a DF with 3 columns: id , pv, location.(the rows are already grouped by location and sort by pv in des)  I wanna get the first 50 id values grouped by location. I checked the API of dataframe,groupeddata,pairRDD, and found no match.      is there a way to do this naturally?      any info will be appreciated.


--------------------------------

 

Thanks&amp;Best regards!
San.Luo





Re: Re: how to select first 50 value of each group after group by?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

Your statement below:

   sortedDF.registerTempTable("sortedDF")
    val top50 = hc.sql("select id,location from sortedDF where
rowNumber<=50")


Is on Hive table. Try

   sortedDF.registerTempTable("sortedDF")
    val top50 = hc.sql("select id,location from sortedDF* LIMIT 50")*

rowNumber is not supported

hive>     select * from oraclehadoop.sales where rowNumber<=50;
FAILED: SemanticException [Error 10004]: Line 1:43 Invalid table alias or
column reference 'rowNumber': (possible column names are: prod_id, cust_id,
time_id, channel_id, promo_id, quantity_sold, amount_sold, year, month)

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 July 2016 at 12:26, <lu...@sina.com> wrote:

> hi Anton:
>       I check the docs you mentioned, and have code accordingly, however
> met an exception like "org.apache.spark.sql.AnalysisException: Window
> function row_number does not take a frame specification.;"
>       It Seems that the row_number API is giving a global row numbers of
> every row across all frames, by my understanding. If wrong,please correct
> me.
>       I checked all the window function API of
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$,
> and just found that maybe row_number() seems matches. I am not quit sure
> about it.
>
> here is my code:
>     val hc = new org.apache.spark.sql.hive.HiveContext(sc)
>     val hivetable = hc.sql("select * from house_sale_pv_location")
>     val overLocation =
> Window.partitionBy(hivetable.col("location")).rowsBetween(0, 49)
>     val sortedDF = hivetable.withColumn("rowNumber",
> row_number().over(overLocation))
>     sortedDF.registerTempTable("sortedDF")
>     val top50 = hc.sql("select id,location from sortedDF where
> rowNumber<=50")
>     top50.registerTempTable("top50")
>     hc.sql("select * from top50 where
> location=30").collect.foreach(println)
>
>
> here, hivetable is a DF that I mentioned with 3 columns  "id , pv,
> location", which is already sorted by pv in desc. So I didn't call orderby
> in the 3rd line of my code. I just want the first 50 rows, based on
>  physical location, of each frame.
>
> To Tal:
>       I tried rank API, however this is not the API I want , because there
> are some values have same pv are ranked as same values. And first 50 rows
> of each frame is what I'm expecting. the attached file shows what I got by
> using rank.
>       Thank you anyway, I learnt what rank could provide from your advice.
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
> ----- 原始邮件 -----
> 发件人:Anton Okolnychyi <an...@gmail.com>
> 收件人:user <us...@spark.apache.org>
> 抄送人:luohui20001@sina.com
> 主题:Re: how to select first 50 value of each group after group by?
> 日期:2016年07月06日 23点22分
>
> The following resources should be useful:
>
>
> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
>
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html
>
> The last link should have the exact solution
>
> 2016-07-06 16:55 GMT+02:00 Tal Grynbaum <ta...@gmail.com>:
>
> You can use rank window function to rank each row in the group,  and then
> filter the rowz with rank < 50
>
> On Wed, Jul 6, 2016, 14:07 <lu...@sina.com> wrote:
>
> hi there
> I have a DF with 3 columns: id , pv, location.(the rows are already
> grouped by location and sort by pv in des)  I wanna get the first 50 id
> values grouped by location. I checked the API of
> dataframe,groupeddata,pairRDD, and found no match.
>       is there a way to do this naturally?
>       any info will be appreciated.
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

Re: Re: how to select first 50 value of each group after group by?

Posted by Anton Okolnychyi <an...@gmail.com>.
Hi,

I can try to guess what is wrong, but I might be incorrect.

You should be careful with window frames (you define them via the
rowsBetween() method).

In my understanding, all window functions can be divided into 2 groups:
- functions defined by the
org.apache.spark.sql.catalyst.expressions.WindowFunction trait ("true"
window functions)
- all other supported functions that are marked as window functions by
providing a window specification.

The main distinction is that functions from the first group might have a
predefined internal frame. That's exactly your case.
Both row_number() and rank() functions are from the first group (i.e. they
have predefined internal frames).
To make your case work, you have 2 options:
- remove your own frame specification(i.e. rowsBetween(0, 49)) and use only
Window.partitionBy(hivetable.col("location"))
- state explictly correct window frames. For instance,
rowsBetween(Long.MinValue, 0) for rank() and row_number().

By the way, there is not too much documentation how Spark resolves window
frames. For that reason, I created a small pull request that can help:
https://github.com/apache/spark/pull/14050
It would be nice if anyone experienced can take a look at it since it is
based only on my own analysis.

2016-07-07 13:26 GMT+02:00 <lu...@sina.com>:

> hi Anton:
>       I check the docs you mentioned, and have code accordingly, however
> met an exception like "org.apache.spark.sql.AnalysisException: Window
> function row_number does not take a frame specification.;"
>       It Seems that the row_number API is giving a global row numbers of
> every row across all frames, by my understanding. If wrong,please correct
> me.
>       I checked all the window function API of
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$,
> and just found that maybe row_number() seems matches. I am not quit sure
> about it.
>
> here is my code:
>     val hc = new org.apache.spark.sql.hive.HiveContext(sc)
>     val hivetable = hc.sql("select * from house_sale_pv_location")
>     val overLocation =
> Window.partitionBy(hivetable.col("location")).rowsBetween(0, 49)
>     val sortedDF = hivetable.withColumn("rowNumber",
> row_number().over(overLocation))
>     sortedDF.registerTempTable("sortedDF")
>     val top50 = hc.sql("select id,location from sortedDF where
> rowNumber<=50")
>     top50.registerTempTable("top50")
>     hc.sql("select * from top50 where
> location=30").collect.foreach(println)
>
>
> here, hivetable is a DF that I mentioned with 3 columns  "id , pv,
> location", which is already sorted by pv in desc. So I didn't call orderby
> in the 3rd line of my code. I just want the first 50 rows, based on
>  physical location, of each frame.
>
> To Tal:
>       I tried rank API, however this is not the API I want , because there
> are some values have same pv are ranked as same values. And first 50 rows
> of each frame is what I'm expecting. the attached file shows what I got by
> using rank.
>       Thank you anyway, I learnt what rank could provide from your advice.
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
> ----- 原始邮件 -----
> 发件人:Anton Okolnychyi <an...@gmail.com>
> 收件人:user <us...@spark.apache.org>
> 抄送人:luohui20001@sina.com
> 主题:Re: how to select first 50 value of each group after group by?
> 日期:2016年07月06日 23点22分
>
> The following resources should be useful:
>
>
> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
>
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html
>
> The last link should have the exact solution
>
> 2016-07-06 16:55 GMT+02:00 Tal Grynbaum <ta...@gmail.com>:
>
> You can use rank window function to rank each row in the group,  and then
> filter the rowz with rank < 50
>
> On Wed, Jul 6, 2016, 14:07 <lu...@sina.com> wrote:
>
> hi there
> I have a DF with 3 columns: id , pv, location.(the rows are already
> grouped by location and sort by pv in des)  I wanna get the first 50 id
> values grouped by location. I checked the API of
> dataframe,groupeddata,pairRDD, and found no match.
>       is there a way to do this naturally?
>       any info will be appreciated.
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>