You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Jones (JIRA)" <ji...@apache.org> on 2016/10/23 22:59:58 UTC

[jira] [Updated] (SPARK-18067) Adding filter after SortMergeJoin creates unnecessary shuffle

     [ https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Paul Jones updated SPARK-18067:
-------------------------------
    Description: 
Basic setup

{code}
scala> case class Data1(key: String, value1: Int)
scala> case class Data2(key: String, value2: Int)

scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
    .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
    .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
{code}

Join on key
{code}
scala> partition1.join(partition2, "key").explain
== Physical Plan ==
Project [key#0,value1#1,value2#13]
+- SortMergeJoin [key#0], [key#12]
   :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
   +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
{code}

And we get a super efficient join with no shuffle.

But if we add a filter our join gets less efficient and we end up with a shuffle.
{code}
scala> partition1.join(partition2, "key").filter($"value1" === $"value2").explain
== Physical Plan ==
Project [key#0,value1#1,value2#13]
+- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
   :- Sort [value1#1 ASC,key#0 ASC], false, 0
   :  +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
   :     +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
   +- Sort [value2#13 ASC,key#12 ASC], false, 0
      +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
         +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
{code}

And we can avoid the shuffle if use a filter statement that can't be pushed in the join.
{code}
scala> partition1.join(partition2, "key").filter($"value1" >= $"value2").explain
== Physical Plan ==
Project [key#0,value1#1,value2#13]
+- Filter (value1#1 >= value2#13)
   +- SortMergeJoin [key#0], [key#12]
      :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
      +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
{code}

What's the best way to avoid the filter pushdown here??

> Adding filter after SortMergeJoin creates unnecessary shuffle
> -------------------------------------------------------------
>
>                 Key: SPARK-18067
>                 URL: https://issues.apache.org/jira/browse/SPARK-18067
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Paul Jones
>            Priority: Minor
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
>     .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
>     .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
>    :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
>    +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" === $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
>    :- Sort [value1#1 ASC,key#0 ASC], false, 0
>    :  +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
>    :     +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
>    +- Sort [value2#13 ASC,key#12 ASC], false, 0
>       +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
>          +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
> {code}
> And we can avoid the shuffle if use a filter statement that can't be pushed in the join.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" >= $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- Filter (value1#1 >= value2#13)
>    +- SortMergeJoin [key#0], [key#12]
>       :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
>       +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
> {code}
> What's the best way to avoid the filter pushdown here??



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org