You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mitesh (JIRA)" <ji...@apache.org> on 2019/02/06 06:55:00 UTC

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

    [ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761535#comment-16761535 ] 

Mitesh edited comment on SPARK-19981 at 2/6/19 6:54 AM:
--------------------------------------------------------

Ping any updates here? This still is an issue in 2.3.2. 

Also maybe a dupe of SPARK-19468


was (Author: masterddt):
Ping any updates here? This still is an issue in 2.3.2.

> Sort-Merge join inserts shuffles when joining dataframes with aliased columns
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-19981
>                 URL: https://issues.apache.org/jira/browse/SPARK-19981
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>            Reporter: Allen George
>            Priority: Major
>
> Performing a sort-merge join with two dataframes - each of which has the join column aliased - causes Spark to insert an unnecessary shuffle.
> Consider the scala test code below, which should be equivalent to the following SQL.
> {code:SQL}
> SELECT * FROM
>   (SELECT number AS aliased from df1) t1
> LEFT JOIN
>   (SELECT number AS aliased from df2) t2
> ON t1.aliased = t2.aliased
> {code}
> {code:scala}
> private case class OneItem(number: Long)
> private case class TwoItem(number: Long, value: String)
> test("join with aliases should not trigger shuffle") {
>   val df1 = sqlContext.createDataFrame(
>     Seq(
>       OneItem(0),
>       OneItem(2),
>       OneItem(4)
>     )
>   )
>   val partitionedDf1 = df1.repartition(10, col("number"))
>   partitionedDf1.createOrReplaceTempView("df1")
>   partitionedDf1.cache() partitionedDf1.count()
>   
>   val df2 = sqlContext.createDataFrame(
>     Seq(
>       TwoItem(0, "zero"),
>       TwoItem(2, "two"),
>       TwoItem(4, "four")
>     )
>   )
>   val partitionedDf2 = df2.repartition(10, col("number"))
>   partitionedDf2.createOrReplaceTempView("df2")
>   partitionedDf2.cache() partitionedDf2.count()
>   
>   val fromDf1 = sqlContext.sql("SELECT number from df1")
>   val fromDf2 = sqlContext.sql("SELECT number from df2")
>   val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased")
>   val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased")
>   aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") }
> {code}
> Both the SQL and the Scala code generate a query-plan where an extra exchange is inserted before performing the sort-merge join. This exchange changes the partitioning from {{HashPartitioning("number", 10)}} for each frame being joined into {{HashPartitioning("aliased", 5)}}. I would have expected that since it's a simple column aliasing, and both frames have exactly the same partitioning that the initial frames.
> {noformat} 
> *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 5)%NONNULL)][outOrder=List(aliased#267L ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)]
> +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 5)%NONNULL)][outOrder=List(aliased#267L ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, aliased#270:bigint%NONNULL)]
>    :- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>    :  +- Exchange [args=hashpartitioning(aliased#267L, 5)%NONNULL][outPart=HashPartitioning(5, aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>    :     +- *Project [args=[number#198L AS aliased#267L]][outPart=HashPartitioning(10, number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>    :        +- InMemoryTableScan [args=[number#198L]][outPart=HashPartitioning(10, number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)]
>    :           :  +- InMemoryRelation [number#198L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)]
>    :           :     :  +- Exchange [args=hashpartitioning(number#198L, 10)%NONNULL][outPart=HashPartitioning(10, number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>    :           :     :     +- LocalTableScan [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>    +- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
>       +- Exchange [args=hashpartitioning(aliased#270L, 5)%NONNULL][outPart=HashPartitioning(5, aliased#270:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
>          +- *Project [args=[number#223L AS aliased#270L]][outPart=HashPartitioning(10, number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
>             +- InMemoryTableScan [args=[number#223L]][outPart=HashPartitioning(10, number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#223:bigint%NONNULL)]
>                :  +- InMemoryRelation [number#223L, value#224], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), false[Statistics(47,false)][output=List(number#223:bigint%NONNULL, value#224:string%NULL)]
>                :     :  +- Exchange [args=hashpartitioning(number#223L, 10)%NONNULL][outPart=HashPartitioning(10, number#223:bigint%NONNULL)][outOrder=List()][output=List(number#223:bigint%NONNULL, value#224:string%NULL)]
>                :     :     +- LocalTableScan [args=[number#223L, value#224]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#223:bigint%NONNULL, value#224:string%NULL)]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org