You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:42:15 UTC

[jira] [Resolved] (SPARK-24025) Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side

     [ https://issues.apache.org/jira/browse/SPARK-24025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24025.
----------------------------------
    Resolution: Incomplete

> Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24025
>                 URL: https://issues.apache.org/jira/browse/SPARK-24025
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.3.1
>         Environment: {code:java}
> ./bin/spark-shell --version
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 2.3.0
> /_/
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162
> Branch master
> Compiled by user sameera on 2018-02-22T19:24:29Z
> Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
> Url git@github.com:sameeragarwal/spark.git
> Type --help for more information.{code}
>            Reporter: Jacek Laskowski
>            Priority: Major
>              Labels: bulk-closed
>         Attachments: join-jira.png
>
>
> While exploring bucketing I found the following join query of non-bucketed and bucketed tables that ends up with two exchanges and two sorts in the physical plan for the non-bucketed join side.
> {code}
> // Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
> // Disable auto broadcasting
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> val bucketedTableName = "bucketed_4_id"
> val large = spark.range(1000000)
> large.write
>   .bucketBy(4, "id")
>   .sortBy("id")
>   .mode("overwrite")
>   .saveAsTable(bucketedTableName)
> // Describe the table and include bucketing spec only
> val descSQL = sql(s"DESC FORMATTED $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === "Sort Columns")
> scala> descSQL.show(truncate = false)
> +--------------+---------+-------+
> |col_name      |data_type|comment|
> +--------------+---------+-------+
> |Num Buckets   |4        |       |
> |Bucket Columns|[`id`]   |       |
> |Sort Columns  |[`id`]   |       |
> +--------------+---------+-------+
> val bucketedTable = spark.table(bucketedTableName)
> val t1 = spark.range(4)
>   .repartition(2, $"id")  // Use just 2 partitions
>   .sortWithinPartitions("id") // sort partitions
> val q = t1.join(bucketedTable, "id")
> // Note two exchanges and sorts
> scala> q.explain
> == Physical Plan ==
> *(5) Project [id#79L]
> +- *(5) SortMergeJoin [id#79L], [id#77L], Inner
>    :- *(3) Sort [id#79L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#79L, 4)
>    :     +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0
>    :        +- Exchange hashpartitioning(id#79L, 2)
>    :           +- *(1) Range (0, 4, step=1, splits=8)
>    +- *(4) Sort [id#77L ASC NULLS FIRST], false, 0
>       +- *(4) Project [id#77L]
>          +- *(4) Filter isnotnull(id#77L)
>             +- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
> q.foreach(_ => ())
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org