You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mingjia Liu (Jira)" <ji...@apache.org> on 2020/08/27 00:11:00 UTC

[jira] [Issue Comment Deleted] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

     [ https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mingjia Liu updated SPARK-32708:
--------------------------------
    Comment: was deleted

(was: I can't repro the issue at Spark 3.0

*Fix:* 2.4 needs  to partially follow what 3.0 does. Basically change pushedFilters to be of Filter class instead of Expression class.  

I am currently working on a fix.)

> Query optimization fails to reuse exchange with DataSourceV2
> ------------------------------------------------------------
>
>                 Key: SPARK-32708
>                 URL: https://issues.apache.org/jira/browse/SPARK-32708
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7
>            Reporter: Mingjia Liu
>            Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() 
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS year#21452L, d_month_seq#21456L]
>    +- CartesianProduct
>       :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[])
>       :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>       :     +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[])
>       :        +- BroadcastNestedLoopJoin BuildRight, Cross
>       :           :- *(1) Project [d_year#20481L]
>       :           :  +- *(1) Filter (((((isnotnull(d_year#20481L) && isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 2002))
>       :           :     +- *(1) FileScan parquet [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), Grea..., ReadSchema: struct<d_year:bigint,d_fy_year:bigint,d_day_name:string>
>       :           +- BroadcastExchange IdentityBroadcastMode
>       :              +- *(2) Project [d_month_seq#21456L]
>       :                 +- *(2) Filter (((isnotnull(d_day_name#21467) && isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && (d_fy_year#21464L > 2000))
>       :                    +- *(2) FileScan parquet [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., ReadSchema: struct<d_month_seq:bigint,d_fy_year:bigint,d_day_name:string>
>       +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], functions=[])
>          +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin BuildRight, Cross
>  : :- *(1) Project d_year#21696L
>  : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: [isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
>  : +- BroadcastExchange IdentityBroadcastMode
>  : +- *(2) Project d_month_seq#22325L
>  : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: [isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
>  +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
>  +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200)
>  +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
>  +- BroadcastNestedLoopJoin BuildRight, Cross
>  :- *(5) Project d_year#22356L
>  : +- *(5) ScanV2 BigQueryDataSourceV2d_year#22356L (Filters: [isnotnull(d_day_name#22364), (d_day_name#22364 = Monday), isnotnull(d_fy_year#22361L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
>  +- BroadcastExchange IdentityBroadcastMode
>  +- *(6) Project d_month_seq#22409L
>  +- *(6) ScanV2 BigQueryDataSourceV2d_month_seq#22409L (Filters: [isnotnull(d_day_name#22420), (d_day_name#22420 = Monday), isnotnull(d_fy_year#22417L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org