You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andy Van Yperen-De Deyne (Jira)" <ji...@apache.org> on 2020/08/31 17:27:00 UTC

[jira] [Commented] (SPARK-18067) SortMergeJoin adds shuffle if join predicates have non partitioned columns

    [ https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187897#comment-17187897 ] 

Andy Van Yperen-De Deyne commented on SPARK-18067:
--------------------------------------------------

I am facing this performance issue when using spark 3.0.0. 
This topic is resolved on the 21st of May 2019, however, the PR on git is closed... Is there any chance this solution is going to be merged in the master branch? Do you have a timeline for that? 

(maybe this is not the correct location to ask this, in which case I apologize)

> SortMergeJoin adds shuffle if join predicates have non partitioned columns
> --------------------------------------------------------------------------
>
>                 Key: SPARK-18067
>                 URL: https://issues.apache.org/jira/browse/SPARK-18067
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Paul Jones
>            Priority: Minor
>              Labels: bulk-closed
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
>     .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
>     .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
>    :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
>    +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" === $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
>    :- Sort [value1#1 ASC,key#0 ASC], false, 0
>    :  +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
>    :     +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
>    +- Sort [value2#13 ASC,key#12 ASC], false, 0
>       +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
>          +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
> {code}
> And we can avoid the shuffle if use a filter statement that can't be pushed in the join.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" >= $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- Filter (value1#1 >= value2#13)
>    +- SortMergeJoin [key#0], [key#12]
>       :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
>       +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
> {code}
> What's the best way to avoid the filter pushdown here??



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org