You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rishi Shah <ri...@gmail.com> on 2020/01/11 19:14:35 UTC
Re: High level explanation of dropDuplicates
Thanks everyone for your contribution on this topic, I wanted to check-in
to see if anyone has discovered a different or have an opinion on better
approach to deduplicating data using pyspark. Would really appreciate any
further insight on this.
Thanks,
-Rishi
On Wed, Jun 12, 2019 at 4:21 PM Yeikel <em...@yeikel.com> wrote:
> Nicholas , thank you for your explanation.
>
> I am also interested in the example that Rishi is asking for. I am sure
> mapPartitions may work , but as Vladimir suggests it may not be the best
> option in terms of performance.
>
> @Vladimir Prus , are you aware of any example about writing a "custom
> physical exec operator"?
>
> If anyone needs a further explanation for the follow up question Rishi
> posted , please see the example below :
>
>
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.Row
>
>
> val someData = Seq(
> Row(1, 10),
> Row(1, 20),
> Row(1, 11)
> )
>
> val schema = List(
> StructField("id", IntegerType, true),
> StructField("score", IntegerType, true)
> )
>
> val df = spark.createDataFrame(
> spark.sparkContext.parallelize(someData),
> StructType(schema)
> )
>
> // Goal : Drop duplicates using the "id" as the primary key and keep the
> highest "score".
>
> df.sort($"score".desc).dropDuplicates("id").show
>
> == Physical Plan ==
> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
> +- Exchange hashpartitioning(id#191, 200)
> +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
> false)])
> +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
> +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
> +- Scan ExistingRDD[id#191,score#192]
>
> This seems to work , but I don't know what are the implications if we use
> this approach with a bigger dataset or what are the alternatives. From the
> explain output I can see the two Exchanges , so it may not be the best
> approach?
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
--
Regards,
Rishi Shah
Re: High level explanation of dropDuplicates
Posted by Miguel Morales <th...@gmail.com>.
I would just map to pair using the id. Then do a reduceByKey where you compare the scores and keep the highest. Then do .values and that should do it.
Sent from my iPhone
> On Jan 11, 2020, at 11:14 AM, Rishi Shah <ri...@gmail.com> wrote:
>
>
> Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone has discovered a different or have an opinion on better approach to deduplicating data using pyspark. Would really appreciate any further insight on this.
>
> Thanks,
> -Rishi
>
>> On Wed, Jun 12, 2019 at 4:21 PM Yeikel <em...@yeikel.com> wrote:
>> Nicholas , thank you for your explanation.
>>
>> I am also interested in the example that Rishi is asking for. I am sure
>> mapPartitions may work , but as Vladimir suggests it may not be the best
>> option in terms of performance.
>>
>> @Vladimir Prus , are you aware of any example about writing a "custom
>> physical exec operator"?
>>
>> If anyone needs a further explanation for the follow up question Rishi
>> posted , please see the example below :
>>
>>
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>>
>>
>> val someData = Seq(
>> Row(1, 10),
>> Row(1, 20),
>> Row(1, 11)
>> )
>>
>> val schema = List(
>> StructField("id", IntegerType, true),
>> StructField("score", IntegerType, true)
>> )
>>
>> val df = spark.createDataFrame(
>> spark.sparkContext.parallelize(someData),
>> StructType(schema)
>> )
>>
>> // Goal : Drop duplicates using the "id" as the primary key and keep the
>> highest "score".
>>
>> df.sort($"score".desc).dropDuplicates("id").show
>>
>> == Physical Plan ==
>> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
>> +- Exchange hashpartitioning(id#191, 200)
>> +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
>> false)])
>> +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>> +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
>> +- Scan ExistingRDD[id#191,score#192]
>>
>> This seems to work , but I don't know what are the implications if we use
>> this approach with a bigger dataset or what are the alternatives. From the
>> explain output I can see the two Exchanges , so it may not be the best
>> approach?
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>
>
> --
> Regards,
>
> Rishi Shah