You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ranjan, Abhinav" <ab...@gmail.com> on 2020/04/01 16:04:35 UTC

Re: Need to order iterator values in spark dataframe

Enrico,

The below solution works but there is a little glitch.

It is working fine in spark-shell but failing for *_/skewed keys/_* 
while doing a spark-submit.

while looking into the execution plan, the partitioning value is same 
for both repartition and groupByKey and is driven by the value 
"spark.sql.shuffle.partitions"

like: Exchange hashpartitioning(value#143, 200)

Any ideas on why is skewed keys giving wrong output while the same code 
giving correct in spark-shell?


--Abhinav

On 26/03/20 10:54 pm, Enrico Minack wrote:
>
> Abhinav,
>
> you can repartition by your key, then sortWithinPartition, and the 
> groupByKey. Since data are already hash-partitioned by key, Spark 
> should not shuffle the data hence change the sort wihtin each partition:
>
> ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")
>
> Enrico
>
>
> Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:
>>
>> Hi,
>>
>> I have a dataframe which has data like:
>>
>> key                         |    code    |    code_value
>> 1                            |    c1        |    11
>> 1                            |    c2        |    12
>> 1                            |    c2        |    9
>> 1                            |    c3        |    12
>> 1                            |    c2        |    13
>> 1                            |    c2        |    14
>> 1                            |    c4        |    12
>> 1                            |    c2        |    15
>> 1                            |    c1        |    12
>>
>>
>> I need to group the data based on key and then apply some custom 
>> logic on every of the value I got by grouping. So I did this:
>>
>> lets suppose it is in a dataframe df.
>>
>> *case class key_class(key: string, code: string, code_value: string)*
>>
>>
>> df
>> .as[key_class]
>> .groupByKey(_.key)
>> .mapGroups {
>>   (x, groupedValues) =>
>>     val status = groupedValues.map(row => {
>>       // do some custom logic on row
>>       ("SUCCESS")
>>     }).toList
>>
>> }.toDF("status")
>>
>>
>> The issue with above approach is the values I get after applying 
>> groupByKey are not sorted/ordered. I want the values to be sorted by 
>> the column 'code'.
>>
>> There is a way to do this:
>>
>> 1. get them in a list and then apply sort ==> this will result in OOM 
>> if the iterartor is too big.
>>
>> 2. I think some how to apply the secondary sort, but problem with 
>> that approach is I have to keep track of the key change.
>>
>> 3. sortWithinPartitions cannot be applied because groupBy will mess 
>> up the order.
>>
>> 4. Another approach is:
>>
>> df
>> .as[key_class]
>> .sort("key").sort("code")
>> .map {
>>  // do stuff here
>> }
>>
>> but here also I have to keep track of the key change within map 
>> function, and sometimes this also overflows if the keys are skewed.
>>
>>
>> _/*So is there any way in which I can get the values sorted after 
>> grouping them by a key.??*/_
>>
>> _/*
>> */_
>>
>> _/*Thanks,*/_
>>
>> _/*Abhinav
>> */_
>>
>