You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Natang (JIRA)" <ji...@apache.org> on 2019/02/21 19:04:00 UTC
[jira] [Created] (SPARK-26959) Join of two tables, bucketed the
same way, on bucket columns and one or more other coulmns should not need a
shuffle
Natang created SPARK-26959:
------------------------------
Summary: Join of two tables, bucketed the same way, on bucket columns and one or more other coulmns should not need a shuffle
Key: SPARK-26959
URL: https://issues.apache.org/jira/browse/SPARK-26959
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 2.2.1
Reporter: Natang
_When two tables, that are bucketed the same way, are joined using bucket columns and one or more other columns, Spark should be able to perform the join without doing a shuffle._
Consider the example below. There are two tables, 'join_left_table' and 'join_right_table', bucketed by 'col1' into 4 buckets. When these tables are joined on 'col1' and 'col2', Spark should be able to do the join without having to do a shuffle. All entries for a give value of 'col1' would be in the same bucket for both the tables, irrespective of values of 'col2'.
----
{noformat}
def randomInt1to100 = scala.util.Random.nextInt(100)+1
val left = sc.parallelize(
Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")
val right = sc.parallelize(
Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")
import org.apache.spark.sql.SaveMode
left.write
.bucketBy(4,"col1")
.sortBy("col1", "col2")
.mode(SaveMode.Overwrite)
.saveAsTable("join_left_table")
right.write
.bucketBy(4,"col1")
.sortBy("col1", "col2")
.mode(SaveMode.Overwrite)
.saveAsTable("join_right_table")
val left_table = spark.read.table("join_left_table")
val right_table = spark.read.table("join_right_table")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val join_on_col1=left_table.join(
right_table,
Seq("col1"))
join_on_col1.explain
### BEGIN Output
join_on_col1: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 3 more fields]
== Physical Plan ==
*Project [col1#250, col2#251, col3#252, col2#258, col3#259]
+- *SortMergeJoin [col1#250], [col1#257], Inner
:- *Sort [col1#250 ASC NULLS FIRST], false, 0
: +- *Project [col1#250, col2#251, col3#252]
: +- *Filter isnotnull(col1#250)
: +- *FileScan parquet default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
+- *Sort [col1#257 ASC NULLS FIRST], false, 0
+- *Project [col1#257, col2#258, col3#259]
+- *Filter isnotnull(col1#257)
+- *FileScan parquet default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
### END Output
val join_on_col1_col2=left_table.join(
right_table,
Seq("col1","col2"))
join_on_col1_col2.explain
### BEGIN Output
join_on_col1_col2: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]
== Physical Plan ==
*Project [col1#250, col2#251, col3#252, col3#259]
+- *SortMergeJoin [col1#250, col2#251], [col1#257, col2#258], Inner
:- *Sort [col1#250 ASC NULLS FIRST, col2#251 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col1#250, col2#251, 200)
: +- *Project [col1#250, col2#251, col3#252]
: +- *Filter (isnotnull(col2#251) && isnotnull(col1#250))
: +- *FileScan parquet default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
+- *Sort [col1#257 ASC NULLS FIRST, col2#258 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#257, col2#258, 200)
+- *Project [col1#257, col2#258, col3#259]
+- *Filter (isnotnull(col2#258) && isnotnull(col1#257))
+- *FileScan parquet default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
### END Output{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org