You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Natang (JIRA)" <ji...@apache.org> on 2019/02/21 19:04:00 UTC

[jira] [Created] (SPARK-26959) Join of two tables, bucketed the same way, on bucket columns and one or more other coulmns should not need a shuffle

Natang created SPARK-26959:
------------------------------

             Summary: Join of two tables, bucketed the same way, on bucket columns and one or more other coulmns should not need a shuffle
                 Key: SPARK-26959
                 URL: https://issues.apache.org/jira/browse/SPARK-26959
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.2.1
            Reporter: Natang


_When two tables, that are bucketed the same way, are joined using bucket columns and one or more other columns, Spark should be able to perform the join without doing a shuffle._

Consider the example below. There are two tables, 'join_left_table' and 'join_right_table', bucketed by 'col1' into 4 buckets. When these tables are joined on 'col1' and 'col2', Spark should be able to do the join without having to do a shuffle. All entries for a give value of 'col1' would be in the same bucket for both the tables, irrespective of values of 'col2'.

 
----
 

 
{noformat}
def randomInt1to100 = scala.util.Random.nextInt(100)+1

val left = sc.parallelize(
  Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")

val right = sc.parallelize(
  Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")


import org.apache.spark.sql.SaveMode

left.write
    .bucketBy(4,"col1")
    .sortBy("col1", "col2")
    .mode(SaveMode.Overwrite)
    .saveAsTable("join_left_table")
    
right.write
    .bucketBy(4,"col1")
    .sortBy("col1", "col2")
    .mode(SaveMode.Overwrite)
    .saveAsTable("join_right_table")


val left_table = spark.read.table("join_left_table")

val right_table = spark.read.table("join_right_table")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val join_on_col1=left_table.join(
        right_table,
        Seq("col1"))

join_on_col1.explain

### BEGIN Output
join_on_col1: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 3 more fields]
== Physical Plan ==
*Project [col1#250, col2#251, col3#252, col2#258, col3#259]
+- *SortMergeJoin [col1#250], [col1#257], Inner
   :- *Sort [col1#250 ASC NULLS FIRST], false, 0
   :  +- *Project [col1#250, col2#251, col3#252]
   :     +- *Filter isnotnull(col1#250)
   :        +- *FileScan parquet default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
   +- *Sort [col1#257 ASC NULLS FIRST], false, 0
      +- *Project [col1#257, col2#258, col3#259]
         +- *Filter isnotnull(col1#257)
            +- *FileScan parquet default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
### END Output

val join_on_col1_col2=left_table.join(
        right_table,
        Seq("col1","col2"))

join_on_col1_col2.explain

### BEGIN Output
join_on_col1_col2: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]
== Physical Plan ==
*Project [col1#250, col2#251, col3#252, col3#259]
+- *SortMergeJoin [col1#250, col2#251], [col1#257, col2#258], Inner
   :- *Sort [col1#250 ASC NULLS FIRST, col2#251 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col1#250, col2#251, 200)
   :     +- *Project [col1#250, col2#251, col3#252]
   :        +- *Filter (isnotnull(col2#251) && isnotnull(col1#250))
   :           +- *FileScan parquet default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
   +- *Sort [col1#257 ASC NULLS FIRST, col2#258 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col1#257, col2#258, 200)
         +- *Project [col1#257, col2#258, col3#259]
            +- *Filter (isnotnull(col2#258) && isnotnull(col1#257))
               +- *FileScan parquet default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<col1:int,col2:int,col3:int>
### END Output{noformat}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org