You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dana Ram Meghwal <da...@saavn.com> on 2017/02/24 07:08:08 UTC
Fwd: Duplicate Rank for within same partitions
---------- Forwarded message ----------
From: Dana Ram Meghwal <da...@saavn.com>
Date: Thu, Feb 23, 2017 at 10:40 PM
Subject: Duplicate Rank for within same partitions
To: user-help@spark.apache.org
Hey Guys,
I am new to spark. I am trying to write a spark script which involves
finding rank of records over same data partitions-- (I will be clear in
short while )
I have a table which have following column name and example data looks like
this (record are around 20 million for each pair of date ,hour, language
and item_type)
Id, language, date, hour, item_type, score
1 hindi 20170220 00 song 10
2 hindi 20170220 00 song 12
3 hindi 20170220 00 song 15
.
.
.
till 20 million
4 english 20170220 00 song 9
5 english 20170220 00 song 18
6 english 20170220 00 song 12
.
.
.
till 20 million
Now I want to rank them over language, date, hour, item_type
so finally it will look like this
Id, language, date, hour, item_type, score rank
1 hindi 20170220 00 song 10 1
2 hindi 20170220 00 song 12
2
3 hindi 20170220 00 song 15
3
4 english 20170220 00 song 9 1
6 english 20170220 00 song 12 2
5 english 20170220 00 song 18
3
to solve this I use rank function in spark
code look like following
1. converting rdd to dataframe
rdd_with_final_score_df = spark.read.json(rdd_with_
final_score).repartition(1);
2. setting window specifications
w = Window.partitionBy("dt","hour","language","item_type","time_
zone").orderBy(rdd_with_final_score_df.score.cast("float").desc())
3. calculating ranks by repartition to 1 partition
rdd_with_final_score_df_rank_df = rdd_with_final_score_df.
repartition(1).withColumn('rank', row_number().over(w))
Now number of row in " rdd_with_final_score" is so high so this RDD is
distributed across machines in cluster.
I am getting result but for each partition I am getting duplicate rank
within partition
for e.g.
Id, language, date, hour, item_type, score rank
1 hindi 20170220 00 song 10 1
2 hindi 20170220 00 song 12 2
3 hindi 20170220 00 song 15 1
here record 1 and record 3 have same rank but it is expected that they
should have different rank or rank should be unique for different score
values.
is case that each partition of RDD rank is getting calculated separately
? and then merging because of that that multiple row getting same rank.
It will be very very help for me if you guys can help me understand what is
going on here and how can we solve this.. I thought repartition would work
but it did not..
I try to use rowBetween or rangeBetween but it was giving error --
pyspark.sql.utils.AnalysisException: u'Window Frame ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING must match the required frame ROWS BETWEEN
UNBOUNDED PRECEDING AN
D CURRENT ROW;'
--
Dana Ram Meghwal
Software Engineer
danaram@saavn.com
--
Dana Ram Meghwal
Software Engineer
danaram@saavn.com
Re: Duplicate Rank for within same partitions
Posted by Yong Zhang <ja...@hotmail.com>.
What you described is not clear here.
Do you want to rank your data based on (date, hour, language, item_type, time_zone), and sort by score;
or you want to rank your data based on (date, hour) and sort by language, item_type, time_zone and score?
If you mean the first one, then your Spark code looks like right, but the example you gave didn't include "time_zone", which maybe the reason the rank starting from 1 again.
In Spark windows specification, partition by is for the columns you want to grouping at, order by is to decide the ordering order within the partition. Both can be applied for multi columns.
Yong
________________________________
From: Dana Ram Meghwal <da...@saavn.com>
Sent: Friday, February 24, 2017 2:08 AM
To: user@spark.apache.org
Subject: Fwd: Duplicate Rank for within same partitions
---------- Forwarded message ----------
From: Dana Ram Meghwal <da...@saavn.com>>
Date: Thu, Feb 23, 2017 at 10:40 PM
Subject: Duplicate Rank for within same partitions
To: user-help@spark.apache.org<ma...@spark.apache.org>
Hey Guys,
I am new to spark. I am trying to write a spark script which involves finding rank of records over same data partitions-- (I will be clear in short while )
I have a table which have following column name and example data looks like this (record are around 20 million for each pair of date ,hour, language and item_type)
Id, language, date, hour, item_type, score
1 hindi 20170220 00 song 10
2 hindi 20170220 00 song 12
3 hindi 20170220 00 song 15
.
.
.
till 20 million
4 english 20170220 00 song 9
5 english 20170220 00 song 18
6 english 20170220 00 song 12
.
.
.
till 20 million
Now I want to rank them over language, date, hour, item_type
so finally it will look like this
Id, language, date, hour, item_type, score rank
1 hindi 20170220 00 song 10 1
2 hindi 20170220 00 song 12 2
3 hindi 20170220 00 song 15 3
4 english 20170220 00 song 9 1
6 english 20170220 00 song 12 2
5 english 20170220 00 song 18 3
to solve this I use rank function in spark
code look like following
1. converting rdd to dataframe
rdd_with_final_score_df = spark.read.json(rdd_with_final_score).repartition(1);
2. setting window specifications
w = Window.partitionBy("dt","hour","language","item_type","time_zone").orderBy(rdd_with_final_score_df.score.cast("float").desc())
3. calculating ranks by repartition to 1 partition
rdd_with_final_score_df_rank_df = rdd_with_final_score_df.repartition(1).withColumn('rank', row_number().over(w))
Now number of row in " rdd_with_final_score" is so high so this RDD is distributed across machines in cluster.
I am getting result but for each partition I am getting duplicate rank within partition
for e.g.
Id, language, date, hour, item_type, score rank
1 hindi 20170220 00 song 10 1
2 hindi 20170220 00 song 12 2
3 hindi 20170220 00 song 15 1
here record 1 and record 3 have same rank but it is expected that they should have different rank or rank should be unique for different score values.
is case that each partition of RDD rank is getting calculated separately ? and then merging because of that that multiple row getting same rank.
It will be very very help for me if you guys can help me understand what is going on here and how can we solve this.. I thought repartition would work but it did not..
I try to use rowBetween or rangeBetween but it was giving error --
pyspark.sql.utils.AnalysisException: u'Window Frame ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AN
D CURRENT ROW;'
--
Dana Ram Meghwal
Software Engineer
danaram@saavn.com<ma...@saavn.com>
--
Dana Ram Meghwal
Software Engineer
danaram@saavn.com<ma...@saavn.com>