You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hao Ren (JIRA)" <ji...@apache.org> on 2015/06/08 14:38:00 UTC

[jira] [Comment Edited] (SPARK-8102) Big performance difference when joining 3 tables in different order

    [ https://issues.apache.org/jira/browse/SPARK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14577106#comment-14577106 ] 

Hao Ren edited comment on SPARK-8102 at 6/8/15 12:37 PM:
---------------------------------------------------------

[~yhuai]

Here are the physical plan for the two queries:

Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18], BuildRight
  Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
   Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
    CartesianProduct
     Project [categoryName#0,refCategoryID#3]
      PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Project [regionName#10,regionCode#9]
      PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11], MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
  Exchange (HashPartitioning [category#17,region#18], 12)
   Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
    PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19], MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}

Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
    ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
     Exchange (HashPartitioning [refCategoryID#3], 12)
      Project [categoryName#0,refCategoryID#3]
       PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [category#17], 12)
      Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19], MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
    PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11], MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}

And they are different ... 


was (Author: invkrh):
Here are the physical plan for the two queries:

Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18], BuildRight
  Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
   Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
    CartesianProduct
     Project [categoryName#0,refCategoryID#3]
      PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Project [regionName#10,regionCode#9]
      PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11], MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
  Exchange (HashPartitioning [category#17,region#18], 12)
   Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
    PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19], MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}

Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
    ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
     Exchange (HashPartitioning [refCategoryID#3], 12)
      Project [categoryName#0,refCategoryID#3]
       PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [category#17], 12)
      Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19], MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
    PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11], MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}

> Big performance difference when joining 3 tables in different order
> -------------------------------------------------------------------
>
>                 Key: SPARK-8102
>                 URL: https://issues.apache.org/jira/browse/SPARK-8102
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.3.1
>         Environment: spark in local mode
>            Reporter: Hao Ren
>
> Given 3 tables loaded from CSV files: 
> ( tables name => size)
> *click_meter_site_grouped* =>10 687 455 bytes
> *t_zipcode* => 2 738 954 bytes
> *t_category* => 2 182 bytes
> When joining the 3 tables, I notice a large performance difference if they are joined in different order.
> Here are the SQL queries to compare:
> {code}
> -- snippet 1
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, t_zipcode z, click_meter_site_grouped g
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> {code}
> -- snippet 2
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, click_meter_site_grouped g, t_zipcode z
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> As you see, the largest table *click_meter_site_grouped* is the last table in FROM clause in the first snippet,  and it is in the middle of table list in second one.
> Snippet 2 runs three times faster than Snippet 1.
> (8 seconds VS 24 seconds)
> As the data is just sampled from a large data set, if we test it on the original data set, it will normally result in a performance issue.
> After checking the log, we found something strange In snippet 1's log:
> 15/06/04 15:32:03 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> It seems that *t_zipcode* is loaded 56 times !!! And, for snippet 2, everything is fine, all the three tables are loaded only once.
> Knowing that SparkSQL's join can automatically broadcast table in join when its size is below *autoBroadcastJoinThreshold*. Not sure if the over-load is caused by auto broadcast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org