You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jacek Laskowski (JIRA)" <ji...@apache.org> on 2018/04/19 11:28:00 UTC
[jira] [Created] (SPARK-24025) Join of bucketed and non-bucketed
tables can give two exchanges and sorts for non-bucketed side
Jacek Laskowski created SPARK-24025:
---------------------------------------
Summary: Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side
Key: SPARK-24025
URL: https://issues.apache.org/jira/browse/SPARK-24025
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.3.0, 2.3.1
Environment: {code:java}
./bin/spark-shell --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162
Branch master
Compiled by user sameera on 2018-02-22T19:24:29Z
Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
Url git@github.com:sameeragarwal/spark.git
Type --help for more information.{code}
Reporter: Jacek Laskowski
While exploring bucketing I found the following join query of non-bucketed and bucketed tables that ends up with two exchanges and two sorts in the physical plan for the non-bucketed join side.
{code}
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
// Disable auto broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val bucketedTableName = "bucketed_4_id"
val large = spark.range(1000000)
large.write
.bucketBy(4, "id")
.sortBy("id")
.mode("overwrite")
.saveAsTable(bucketedTableName)
// Describe the table and include bucketing spec only
val descSQL = sql(s"DESC FORMATTED $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === "Sort Columns")
scala> descSQL.show(truncate = false)
+--------------+---------+-------+
|col_name |data_type|comment|
+--------------+---------+-------+
|Num Buckets |4 | |
|Bucket Columns|[`id`] | |
|Sort Columns |[`id`] | |
+--------------+---------+-------+
val bucketedTable = spark.table(bucketedTableName)
val t1 = spark.range(4)
.repartition(2, $"id") // Use just 2 partitions
.sortWithinPartitions("id") // sort partitions
val q = t1.join(bucketedTable, "id")
// Note two exchanges and sorts
scala> q.explain
== Physical Plan ==
*(5) Project [id#79L]
+- *(5) SortMergeJoin [id#79L], [id#77L], Inner
:- *(3) Sort [id#79L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#79L, 4)
: +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#79L, 2)
: +- *(1) Range (0, 4, step=1, splits=8)
+- *(4) Sort [id#77L ASC NULLS FIRST], false, 0
+- *(4) Project [id#77L]
+- *(4) Filter isnotnull(id#77L)
+- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
q.foreach(_ => ())
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org