You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mingjia Liu (Jira)" <ji...@apache.org> on 2020/08/13 17:19:00 UTC

[jira] [Created] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

Mingjia Liu created SPARK-32609:
-----------------------------------

             Summary: Incorrect exchange reuse with DataSourceV2
                 Key: SPARK-32609
                 URL: https://issues.apache.org/jira/browse/SPARK-32609
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.5
            Reporter: Mingjia Liu


 
{code:java}
spark.conf.set("spark.sql.exchange.reuse","true")
spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
df.createOrReplaceTempView(table)
    
df = spark.sql(""" 
WITH t1 AS (
    SELECT 
        d_year, d_month_seq
    FROM (
        SELECT t1.d_year , t2.d_month_seq          
        FROM 
        date_dim t1
        cross join
        date_dim t2
        where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
        and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
        )
    GROUP BY d_year, d_month_seq)
   
 SELECT
    prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
 FROM t1 curr_yr cross join t1 prev_yr
 WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
 ORDER BY d_month_seq
 LIMIT 100
 
 """)

df.explain()
df.show(){code}
 

the repro query :
A. defines a temp table t1  
B. cross join t1 (year 2002)  and  t2 (year 2001)

With reuse exchange enabled, the plan incorrectly "decides" to re-use persisted shuffle writes of A filtering on year 2002 , for year 2001.
{code:java}
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
+- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS year#24367L, d_month_seq#24371L]
   +- CartesianProduct
      :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[])
      :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
      :     +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[])
      :        +- BroadcastNestedLoopJoin BuildRight, Cross
      :           :- *(1) Project [d_year#23551L]
      :           :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
      :           +- BroadcastExchange IdentityBroadcastMode
      :              +- *(2) Project [d_month_seq#24371L]
      :                 +- *(2) ScanV2 BigQueryDataSourceV2[d_month_seq#24371L] (Filters: [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
      +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], functions=[])
         +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
 

 

And the result is obviously incorrect because prev_year should be 2001
{code:java}
+---------+----+-----------+
|prev_year|year|d_month_seq|
+---------+----+-----------+
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
|     2002|2002|       1212|
+---------+----+-----------+
only showing top 20 rows
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org