You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (JIRA)" <ji...@apache.org> on 2019/02/24 13:50:00 UTC
[jira] [Commented] (SPARK-26959) Join of two tables, bucketed the
same way, on bucket columns and one or more other coulmns should not need a
shuffle
[ https://issues.apache.org/jira/browse/SPARK-26959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776247#comment-16776247 ]
Yuming Wang commented on SPARK-26959:
-------------------------------------
Duplicate with [SPARK-24087|https://issues.apache.org/jira/browse/SPARK-24087]?
> Join of two tables, bucketed the same way, on bucket columns and one or more other coulmns should not need a shuffle
> --------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-26959
> URL: https://issues.apache.org/jira/browse/SPARK-26959
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.2.1, 2.4.0
> Reporter: Natang
> Priority: Major
>
> _When two tables, that are bucketed the same way, are joined using bucket columns and one or more other columns, Spark should be able to perform the join without doing a shuffle._
> Consider the example below. There are two tables, 'join_left_table' and 'join_right_table', bucketed by 'col1' into 4 buckets. When these tables are joined on 'col1' and 'col2', Spark should be able to do the join without having to do a shuffle. All entries for a give value of 'col1' would be in the same bucket for both the tables, irrespective of values of 'col2'.
>
> ----
>
>
> {noformat}
> def randomInt1to100 = scala.util.Random.nextInt(100)+1
> val left = sc.parallelize(
> Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
> ).toDF("col1", "col2", "col3")
> val right = sc.parallelize(
> Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
> ).toDF("col1", "col2", "col3")
> import org.apache.spark.sql.SaveMode
> left.write
> .bucketBy(4,"col1")
> .sortBy("col1", "col2")
> .mode(SaveMode.Overwrite)
> .saveAsTable("join_left_table")
>
> right.write
> .bucketBy(4,"col1")
> .sortBy("col1", "col2")
> .mode(SaveMode.Overwrite)
> .saveAsTable("join_right_table")
> val left_table = spark.read.table("join_left_table")
> val right_table = spark.read.table("join_right_table")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> val join_on_col1=left_table.join(
> right_table,
> Seq("col1"))
> join_on_col1.explain
> ### BEGIN Output
> join_on_col1: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 3 more fields]
> == Physical Plan ==
> *Project [col1#250, col2#251, col3#252, col2#258, col3#259]
> +- *SortMergeJoin [col1#250], [col1#257], Inner
> :- *Sort [col1#250 ASC NULLS FIRST], false, 0
> : +- *Project [col1#250, col2#251, col3#252]
> : +- *Filter isnotnull(col1#250)
> : +- *FileScan parquet default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
> +- *Sort [col1#257 ASC NULLS FIRST], false, 0
> +- *Project [col1#257, col2#258, col3#259]
> +- *Filter isnotnull(col1#257)
> +- *FileScan parquet default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
> ### END Output
> val join_on_col1_col2=left_table.join(
> right_table,
> Seq("col1","col2"))
> join_on_col1_col2.explain
> ### BEGIN Output
> join_on_col1_col2: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]
> == Physical Plan ==
> *Project [col1#250, col2#251, col3#252, col3#259]
> +- *SortMergeJoin [col1#250, col2#251], [col1#257, col2#258], Inner
> :- *Sort [col1#250 ASC NULLS FIRST, col2#251 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(col1#250, col2#251, 200)
> : +- *Project [col1#250, col2#251, col3#252]
> : +- *Filter (isnotnull(col2#251) && isnotnull(col1#250))
> : +- *FileScan parquet default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
> +- *Sort [col1#257 ASC NULLS FIRST, col2#258 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(col1#257, col2#258, 200)
> +- *Project [col1#257, col2#258, col3#259]
> +- *Filter (isnotnull(col2#258) && isnotnull(col1#257))
> +- *FileScan parquet default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
> ### END Output{noformat}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org