You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rishi Shah <ri...@gmail.com> on 2020/01/11 19:14:35 UTC

Re: High level explanation of dropDuplicates

Thanks everyone for your contribution on this topic, I wanted to check-in
to see if anyone has discovered a different or have an opinion on better
approach to deduplicating data using pyspark. Would really appreciate any
further insight on this.

Thanks,
-Rishi

On Wed, Jun 12, 2019 at 4:21 PM Yeikel <em...@yeikel.com> wrote:

> Nicholas , thank you for your explanation.
>
> I am also interested in the example that Rishi is asking for.  I am sure
> mapPartitions may work , but as Vladimir suggests it may not be the best
> option in terms of performance.
>
> @Vladimir Prus , are you aware of any example about writing a  "custom
> physical exec operator"?
>
> If anyone needs a further explanation for the follow up  question Rishi
> posted , please see the example below :
>
>
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.Row
>
>
> val someData = Seq(
>   Row(1, 10),
>   Row(1, 20),
>   Row(1, 11)
> )
>
> val schema = List(
>   StructField("id", IntegerType, true),
>   StructField("score", IntegerType, true)
> )
>
> val df = spark.createDataFrame(
>   spark.sparkContext.parallelize(someData),
>   StructType(schema)
> )
>
> // Goal : Drop duplicates using the "id" as the primary key and keep the
> highest "score".
>
> df.sort($"score".desc).dropDuplicates("id").show
>
> == Physical Plan ==
> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
> +- Exchange hashpartitioning(id#191, 200)
>    +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
> false)])
>       +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>          +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
>             +- Scan ExistingRDD[id#191,score#192]
>
> This seems to work , but I don't know what are the implications if we use
> this approach with a bigger dataset or what are the alternatives. From the
> explain output I can see the two Exchanges , so it may not be the best
> approach?
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

-- 
Regards,

Rishi Shah

Re: High level explanation of dropDuplicates

Posted by Miguel Morales <th...@gmail.com>.
I would just map to pair using the id. Then do a reduceByKey where you compare the scores and keep the highest. Then do .values and that should do it.

Sent from my iPhone

> On Jan 11, 2020, at 11:14 AM, Rishi Shah <ri...@gmail.com> wrote:
> 
> 
> Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone has discovered a different or have an opinion on better approach to deduplicating data using pyspark. Would really appreciate any further insight on this.
> 
> Thanks,
> -Rishi
> 
>> On Wed, Jun 12, 2019 at 4:21 PM Yeikel <em...@yeikel.com> wrote:
>> Nicholas , thank you for your explanation. 
>> 
>> I am also interested in the example that Rishi is asking for.  I am sure
>> mapPartitions may work , but as Vladimir suggests it may not be the best
>> option in terms of performance. 
>> 
>> @Vladimir Prus , are you aware of any example about writing a  "custom
>> physical exec operator"? 
>> 
>> If anyone needs a further explanation for the follow up  question Rishi 
>> posted , please see the example below : 
>> 
>> 
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>> 
>> 
>> val someData = Seq(
>>   Row(1, 10),
>>   Row(1, 20),
>>   Row(1, 11)
>> )
>> 
>> val schema = List(
>>   StructField("id", IntegerType, true),
>>   StructField("score", IntegerType, true)
>> )
>> 
>> val df = spark.createDataFrame(
>>   spark.sparkContext.parallelize(someData),
>>   StructType(schema)
>> )
>> 
>> // Goal : Drop duplicates using the "id" as the primary key and keep the
>> highest "score".
>> 
>> df.sort($"score".desc).dropDuplicates("id").show
>> 
>> == Physical Plan ==
>> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
>> +- Exchange hashpartitioning(id#191, 200)
>>    +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
>> false)])
>>       +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>>          +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
>>             +- Scan ExistingRDD[id#191,score#192]
>> 
>> This seems to work , but I don't know what are the implications if we use
>> this approach with a bigger dataset or what are the alternatives. From the
>> explain output I can see the two Exchanges , so it may not be the best
>> approach? 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> 
> 
> 
> -- 
> Regards,
> 
> Rishi Shah