You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ranjan, Abhinav" <ab...@gmail.com> on 2020/04/01 16:04:35 UTC
Re: Need to order iterator values in spark dataframe
Enrico,
The below solution works but there is a little glitch.
It is working fine in spark-shell but failing for *_/skewed keys/_*
while doing a spark-submit.
while looking into the execution plan, the partitioning value is same
for both repartition and groupByKey and is driven by the value
"spark.sql.shuffle.partitions"
like: Exchange hashpartitioning(value#143, 200)
Any ideas on why is skewed keys giving wrong output while the same code
giving correct in spark-shell?
--Abhinav
On 26/03/20 10:54 pm, Enrico Minack wrote:
>
> Abhinav,
>
> you can repartition by your key, then sortWithinPartition, and the
> groupByKey. Since data are already hash-partitioned by key, Spark
> should not shuffle the data hence change the sort wihtin each partition:
>
> ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")
>
> Enrico
>
>
> Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:
>>
>> Hi,
>>
>> I have a dataframe which has data like:
>>
>> key | code | code_value
>> 1 | c1 | 11
>> 1 | c2 | 12
>> 1 | c2 | 9
>> 1 | c3 | 12
>> 1 | c2 | 13
>> 1 | c2 | 14
>> 1 | c4 | 12
>> 1 | c2 | 15
>> 1 | c1 | 12
>>
>>
>> I need to group the data based on key and then apply some custom
>> logic on every of the value I got by grouping. So I did this:
>>
>> lets suppose it is in a dataframe df.
>>
>> *case class key_class(key: string, code: string, code_value: string)*
>>
>>
>> df
>> .as[key_class]
>> .groupByKey(_.key)
>> .mapGroups {
>> (x, groupedValues) =>
>> val status = groupedValues.map(row => {
>> // do some custom logic on row
>> ("SUCCESS")
>> }).toList
>>
>> }.toDF("status")
>>
>>
>> The issue with above approach is the values I get after applying
>> groupByKey are not sorted/ordered. I want the values to be sorted by
>> the column 'code'.
>>
>> There is a way to do this:
>>
>> 1. get them in a list and then apply sort ==> this will result in OOM
>> if the iterartor is too big.
>>
>> 2. I think some how to apply the secondary sort, but problem with
>> that approach is I have to keep track of the key change.
>>
>> 3. sortWithinPartitions cannot be applied because groupBy will mess
>> up the order.
>>
>> 4. Another approach is:
>>
>> df
>> .as[key_class]
>> .sort("key").sort("code")
>> .map {
>> // do stuff here
>> }
>>
>> but here also I have to keep track of the key change within map
>> function, and sometimes this also overflows if the keys are skewed.
>>
>>
>> _/*So is there any way in which I can get the values sorted after
>> grouping them by a key.??*/_
>>
>> _/*
>> */_
>>
>> _/*Thanks,*/_
>>
>> _/*Abhinav
>> */_
>>
>