You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tejas Patil (JIRA)" <ji...@apache.org> on 2017/01/08 02:54:58 UTC
[jira] [Updated] (SPARK-19122) Unnecessary shuffle+sort added if
join predicates ordering differ from bucketing and sorting order
[ https://issues.apache.org/jira/browse/SPARK-19122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tejas Patil updated SPARK-19122:
--------------------------------
Description:
`table1` and `table2` are sorted and bucketed on columns `j` and `k` (in respective order)
This is how they are generated:
{code}
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table1")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table2")
{code}
Now, if join predicates are specified in query in *same* order as bucketing and sort order, there is no shuffle and sort.
{code}
scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND a.k=b.k").explain(true)
== Physical Plan ==
*SortMergeJoin [j#61, k#62], [j#100, k#101], Inner
:- *Project [i#60, j#61, k#62]
: +- *Filter (isnotnull(k#62) && isnotnull(j#61))
: +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#99, j#100, k#101]
+- *Filter (isnotnull(j#100) && isnotnull(k#101))
+- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
{code}
The same query with join predicates in *different* order from bucketing and sort order leads to extra shuffle and sort being introduced
{code}
scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j ").explain(true)
== Physical Plan ==
*SortMergeJoin [k#62, j#61], [k#101, j#100], Inner
:- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(k#62, j#61, 200)
: +- *Project [i#60, j#61, k#62]
: +- *Filter (isnotnull(k#62) && isnotnull(j#61))
: +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(k#101, j#100, 200)
+- *Project [i#99, j#100, k#101]
+- *Filter (isnotnull(j#100) && isnotnull(k#101))
+- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
{code}
was:
`table1` and `table2` are sorted and bucketed on columns `j` and `k` (in respective order)
This is how they are generated:
{code}
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table1")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table2")
{code}
Now, if join predicates are specified in query in *same* order as bucketing and sort order, there is no shuffle and sort.
{code}
scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND a.k=b.k").explain(true)
== Physical Plan ==
*SortMergeJoin [j#61, k#62], [j#100, k#101], Inner
:- *Project [i#60, j#61, k#62]
: +- *Filter (isnotnull(k#62) && isnotnull(j#61))
: +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#99, j#100, k#101]
+- *Filter (isnotnull(j#100) && isnotnull(k#101))
+- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
{code}
The same query with join predicates in *different* order from bucketing and sort order leads to extra shuffle and sort being introduced
{code}
scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j ").explain(true)
== Physical Plan ==
*SortMergeJoin [k#62, j#61], [k#101, j#100], Inner
:- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(k#62, j#61, 200)
: +- *Project [i#60, j#61, k#62]
: +- *Filter (isnotnull(k#62) && isnotnull(j#61))
: +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(k#101, j#100, 200)
+- *Project [i#99, j#100, k#101]
+- *Filter (isnotnull(j#100) && isnotnull(k#101))
+- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
{code}
> Unnecessary shuffle+sort added if join predicates ordering differ from bucketing and sorting order
> --------------------------------------------------------------------------------------------------
>
> Key: SPARK-19122
> URL: https://issues.apache.org/jira/browse/SPARK-19122
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.2, 2.1.0
> Reporter: Tejas Patil
>
> `table1` and `table2` are sorted and bucketed on columns `j` and `k` (in respective order)
> This is how they are generated:
> {code}
> val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
> df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table1")
> df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table2")
> {code}
> Now, if join predicates are specified in query in *same* order as bucketing and sort order, there is no shuffle and sort.
> {code}
> scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND a.k=b.k").explain(true)
> == Physical Plan ==
> *SortMergeJoin [j#61, k#62], [j#100, k#101], Inner
> :- *Project [i#60, j#61, k#62]
> : +- *Filter (isnotnull(k#62) && isnotnull(j#61))
> : +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
> +- *Project [i#99, j#100, k#101]
> +- *Filter (isnotnull(j#100) && isnotnull(k#101))
> +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
> {code}
> The same query with join predicates in *different* order from bucketing and sort order leads to extra shuffle and sort being introduced
> {code}
> scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j ").explain(true)
> == Physical Plan ==
> *SortMergeJoin [k#62, j#61], [k#101, j#100], Inner
> :- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(k#62, j#61, 200)
> : +- *Project [i#60, j#61, k#62]
> : +- *Filter (isnotnull(k#62) && isnotnull(j#61))
> : +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
> +- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(k#101, j#100, 200)
> +- *Project [i#99, j#100, k#101]
> +- *Filter (isnotnull(j#100) && isnotnull(k#101))
> +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org