You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rohit Mishra (Jira)" <ji...@apache.org> on 2020/08/13 18:29:00 UTC

[jira] [Comment Edited] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

    [ https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177246#comment-17177246 ] 

Rohit Mishra edited comment on SPARK-32609 at 8/13/20, 6:28 PM:
----------------------------------------------------------------

[~mingjial], Please refrain from adding Priority as "Critical". These are reserved for committers. Changing it to "Major".

Also please don't populate Target and Fix version field. These are also set by committers.


was (Author: rohitmishr1484):
[~mingjial], Please refrain from adding Priority as "Critical". These are reserved for committers. Changing it to "Major".

> Incorrect exchange reuse with DataSourceV2
> ------------------------------------------
>
>                 Key: SPARK-32609
>                 URL: https://issues.apache.org/jira/browse/SPARK-32609
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.5
>            Reporter: Mingjia Liu
>            Priority: Major
>              Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
>     
> df = spark.sql(""" 
> WITH t1 AS (
>     SELECT 
>         d_year, d_month_seq
>     FROM (
>         SELECT t1.d_year , t2.d_month_seq          
>         FROM 
>         date_dim t1
>         cross join
>         date_dim t2
>         where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>         and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>         )
>     GROUP BY d_year, d_month_seq)
>    
>  SELECT
>     prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS year#24367L, d_month_seq#24371L]
>    +- CartesianProduct
>       :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[])
>       :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>       :     +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[])
>       :        +- BroadcastNestedLoopJoin BuildRight, Cross
>       :           :- *(1) Project [d_year#23551L]
>       :           :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
>       :           +- BroadcastExchange IdentityBroadcastMode
>       :              +- *(2) Project [d_month_seq#24371L]
>       :                 +- *(2) ScanV2 BigQueryDataSourceV2[d_month_seq#24371L] (Filters: [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
>       +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], functions=[])
>          +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +---------+----+-----------+
> |prev_year|year|d_month_seq|
> +---------+----+-----------+
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> |     2002|2002|       1212|
> +---------+----+-----------+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org