You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2020/09/04 00:02:00 UTC

[jira] [Resolved] (SPARK-32767) Bucket join should work if spark.sql.shuffle.partitions larger than bucket number

     [ https://issues.apache.org/jira/browse/SPARK-32767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuming Wang resolved SPARK-32767.
---------------------------------
    Fix Version/s: 3.0.2
                   3.1.0
       Resolution: Fixed

Issue resolved by pull request 29624
[https://github.com/apache/spark/pull/29624]

> Bucket join should work if spark.sql.shuffle.partitions larger than bucket number
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-32767
>                 URL: https://issues.apache.org/jira/browse/SPARK-32767
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Yuming Wang
>            Assignee: Yuming Wang
>            Priority: Major
>             Fix For: 3.1.0, 3.0.2
>
>
> How to reproduce this issue:
> {code:scala}
>     spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1")
>     spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2")
>     sql("set spark.sql.shuffle.partitions=600")
>     sql("set spark.sql.autoBroadcastJoinThreshold=-1")
>     sql("select * from t1 join t2 on t1.id = t2.id").explain()
> {code}
> {noformat}
> == Physical Plan ==
> *(5) SortMergeJoin [id#26L], [id#27L], Inner
> :- *(2) Sort [id#26L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(id#26L, 600), true, [id=#65]
> :     +- *(1) Filter isnotnull(id#26L)
> :        +- *(1) ColumnarToRow
> :           +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432
> +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#27L, 600), true, [id=#74]
>       +- *(3) Filter isnotnull(id#27L)
>          +- *(3) ColumnarToRow
>             +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
> {noformat}
> *Expected*:
> {noformat}
> == Physical Plan ==
> *(4) SortMergeJoin [id#26L], [id#27L], Inner
> :- *(1) Sort [id#26L ASC NULLS FIRST], false, 0
> :  +- *(1) Filter isnotnull(id#26L)
> :     +- *(1) ColumnarToRow
> :        +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432
> +- *(3) Sort [id#27L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#27L, 432), true, [id=#69]
>       +- *(2) Filter isnotnull(id#27L)
>          +- *(2) ColumnarToRow
>             +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org