You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:23:24 UTC

[jira] [Updated] (SPARK-15219) [Spark SQL] it don't support to detect runtime temporary table for enabling broadcast hash join optimization

     [ https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-15219:
---------------------------------
    Labels: bulk-closed  (was: )

> [Spark SQL] it don't support to detect runtime temporary table for enabling broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15219
>                 URL: https://issues.apache.org/jira/browse/SPARK-15219
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Yi Zhou
>            Priority: Major
>              Labels: bulk-closed
>
> We observed an interesting thing about broadcast Hash join( similar to Map Join in Hive) when comparing the implementation by Hive on MR engine. The blew query is a multi-way join operation based on 3 tables including product_reviews, 2 run-time temporary result tables(fsr and fwr) from ‘select’ query operation and also there is a two-way join(1 table and 1 run-time temporary table) in both 'fsr' and 'fwr',which cause slower performance than Hive on MR. We investigated the difference between Spark SQL and Hive on MR engine and found that there are total of 5 map join tasks with tuned map join parameters in Hive on MR but there are only 2 broadcast hash join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for broadcast hash join. From our investigation, it seems that if there is run-time temporary table in join operation in Spark SQL engine it will not detect such table for enabling broadcast hash join optimization. 
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in select clause is not allowed
>   SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
>   (
>     item_sk,
>     review_sentence,
>     sentiment,
>     sentiment_word
>   )
>   FROM product_reviews pr,
>   (
>     --store returns in week ending given date
>     SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
>     FROM store_returns sr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
>     ) sr_dateFilter
>     WHERE sr.sr_returned_date_sk = d_date_sk
>     GROUP BY sr_item_sk --across all store and web channels
>     HAVING sr_item_qty > 0
>   ) fsr,
>   (
>     --web returns in week ending given date
>     SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
>     FROM web_returns wr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
>     ) wr_dateFilter
>     WHERE wr.wr_returned_date_sk = d_date_sk
>     GROUP BY wr_item_sk  --across all store and web channels
>     HAVING wr_item_qty > 0
>   ) fwr
>   WHERE fsr.sr_item_sk = fwr.wr_item_sk
>   AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
>   -- equivalent across all store and web channels (within a tolerance of +/- 10%)
>   AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}
> Physical Plan:
> {code}
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation bigbench_3t_sparksql, q19_spark_sql_run_query_0_result, None, Map(), false, false
> +- ConvertToSafe
>    +- Sort [item_sk#537L ASC,review_sentence#538 ASC,sentiment#539 ASC,sentiment_word#540 ASC], true, 0
>       +- ConvertToUnsafe
>          +- Exchange rangepartitioning(item_sk#537L ASC,review_sentence#538 ASC,sentiment#539 ASC,sentiment_word#540 ASC,200), None
>             +- ConvertToSafe
>                +- Project [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
>                   +- Filter (sentiment#539 = NEG)
>                      +- !Generate HiveGenericUDTF#io.bigdatabenchmark.v1.queries.q10.SentimentUDF(pr_item_sk#363L,pr_review_content#366), false, false, [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
>                         +- ConvertToSafe
>                            +- Project [pr_item_sk#363L,pr_review_content#366]
>                               +- Filter (abs((cast((sr_item_qty#356L - wr_item_qty#357L) as double) / (cast((sr_item_qty#356L + wr_item_qty#357L) as double) / 2.0))) <= 0.1)
>                                  +- SortMergeJoin [sr_item_sk#369L], [wr_item_sk#445L]
>                                     :- Sort [sr_item_sk#369L ASC], false, 0
>                                     :  +- Project [pr_item_sk#363L,sr_item_qty#356L,pr_review_content#366,sr_item_sk#369L]
>                                     :     +- SortMergeJoin [pr_item_sk#363L], [sr_item_sk#369L]
>                                     :        :- Sort [pr_item_sk#363L ASC], false, 0
>                                     :        :  +- TungstenExchange hashpartitioning(pr_item_sk#363L,200), None
>                                     :        :     +- ConvertToUnsafe
>                                     :        :        +- HiveTableScan [pr_item_sk#363L,pr_review_content#366], MetastoreRelation bigbench_3t_sparksql, product_reviews, Some(pr)
>                                     :        +- Sort [sr_item_sk#369L ASC], false, 0
>                                     :           +- Filter (sr_item_qty#356L > 0)
>                                     :              +- TungstenAggregate(key=[sr_item_sk#369L], functions=[(sum(cast(sr_return_quantity#377 as bigint)),mode=Final,isDistinct=false)], output=[sr_item_sk#369L,sr_item_qty#356L])
>                                     :                 +- TungstenExchange hashpartitioning(sr_item_sk#369L,200), None
>                                     :                    +- TungstenAggregate(key=[sr_item_sk#369L], functions=[(sum(cast(sr_return_quantity#377 as bigint)),mode=Partial,isDistinct=false)], output=[sr_item_sk#369L,sum#551L])
>                                     :                       +- Project [sr_item_sk#369L,sr_return_quantity#377]
>                                     :                          +- SortMergeJoin [sr_returned_date_sk#367L], [d_date_sk#387L]
>                                     :                             :- Sort [sr_returned_date_sk#367L ASC], false, 0
>                                     :                             :  +- TungstenExchange hashpartitioning(sr_returned_date_sk#367L,200), None
>                                     :                             :     +- ConvertToUnsafe
>                                     :                             :        +- HiveTableScan [sr_item_sk#369L,sr_return_quantity#377,sr_returned_date_sk#367L], MetastoreRelation bigbench_3t_sparksql, store_returns, Some(sr)
>                                     :                             +- Sort [d_date_sk#387L ASC], false, 0
>                                     :                                +- TungstenExchange hashpartitioning(d_date_sk#387L,200), None
>                                     :                                   +- Project [d_date_sk#387L]
>                                     :                                      +- BroadcastHashJoin [d_week_seq#391], [d_week_seq#419], BuildRight
>                                     :                                         :- ConvertToUnsafe
>                                     :                                         :  +- HiveTableScan [d_date_sk#387L,d_week_seq#391], MetastoreRelation bigbench_3t_sparksql, date_dim, Some(d1)
>                                     :                                         +- Project [d_week_seq#419]
>                                     :                                            +- Filter d_date#417 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
>                                     :                                               +- HiveTableScan [d_week_seq#419,d_date#417], MetastoreRelation bigbench_3t_sparksql, date_dim, Some(d2)
>                                     +- Sort [wr_item_sk#445L ASC], false, 0
>                                        +- Filter (wr_item_qty#357L > 0)
>                                           +- TungstenAggregate(key=[wr_item_sk#445L], functions=[(sum(cast(wr_return_quantity#457 as bigint)),mode=Final,isDistinct=false)], output=[wr_item_sk#445L,wr_item_qty#357L])
>                                              +- TungstenExchange hashpartitioning(wr_item_sk#445L,200), None
>                                                 +- TungstenAggregate(key=[wr_item_sk#445L], functions=[(sum(cast(wr_return_quantity#457 as bigint)),mode=Partial,isDistinct=false)], output=[wr_item_sk#445L,sum#554L])
>                                                    +- Project [wr_item_sk#445L,wr_return_quantity#457]
>                                                       +- SortMergeJoin [wr_returned_date_sk#443L], [d_date_sk#467L]
>                                                          :- Sort [wr_returned_date_sk#443L ASC], false, 0
>                                                          :  +- TungstenExchange hashpartitioning(wr_returned_date_sk#443L,200), None
>                                                          :     +- ConvertToUnsafe
>                                                          :        +- HiveTableScan [wr_item_sk#445L,wr_return_quantity#457,wr_returned_date_sk#443L], MetastoreRelation bigbench_3t_sparksql, web_returns, Some(wr)
>                                                          +- Sort [d_date_sk#467L ASC], false, 0
>                                                             +- TungstenExchange hashpartitioning(d_date_sk#467L,200), None
>                                                                +- Project [d_date_sk#467L]
>                                                                   +- BroadcastHashJoin [d_week_seq#471], [d_week_seq#499], BuildRight
>                                                                      :- ConvertToUnsafe
>                                                                      :  +- HiveTableScan [d_date_sk#467L,d_week_seq#471], MetastoreRelation bigbench_3t_sparksql, date_dim, Some(d1)
>                                                                      +- Project [d_week_seq#499]
>                                                                         +- Filter d_date#497 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
>                                                                            +- HiveTableScan [d_week_seq#499,d_date#497], MetastoreRelation bigbench_3t_sparksql, date_dim, Some(d2)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org