You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mingjia Liu (Jira)" <ji...@apache.org> on 2020/08/26 23:45:00 UTC

[jira] [Created] (SPARK-32708) Query optimization fail to reuse exchange with DataSourceV2

Mingjia Liu created SPARK-32708:
-----------------------------------

             Summary: Query optimization fail to reuse exchange with DataSourceV2
                 Key: SPARK-32708
                 URL: https://issues.apache.org/jira/browse/SPARK-32708
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.6, 2.4.5, 2.4.4, 2.4.3, 2.4.2, 2.4.1, 2.4.0, 2.4.7
            Reporter: Mingjia Liu


Repro query:
{code:java}
spark.conf.set("spark.sql.exchange.reuse","true")
spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
#spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() 
df.createOrReplaceTempView(table)
 
df = spark.sql(""" 
WITH t1 AS (
 SELECT 
 d_year, d_month_seq
 FROM (
 SELECT t1.d_year , t2.d_month_seq 
 FROM 
 date_dim t1
 cross join
 date_dim t2
 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
 )
 GROUP BY d_year, d_month_seq)
 
 SELECT
 prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
 FROM t1 curr_yr cross join t1 prev_yr
 WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
 ORDER BY d_month_seq
 LIMIT 100
 
 """)
df.explain()
#df.show()
{code}
 

The above query has different plans (marked in {color:#FF0000}red{color}) with Parquet and DataSourceV2. Both plans are correct tho. However, the DataSourceV2 plan is less optimized :

Sub-plan [5-7] is exactly the same as sub-plan [1-3]. Therefore, the below parquet plan reused shuffle writes after [1-3]. However, the datasource V2 plan failed to do so.

Parquet:
{code:java}
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
+- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS year#21452L, d_month_seq#21456L]
   +- CartesianProduct
      :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[])
      :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
      :     +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[])
      :        +- BroadcastNestedLoopJoin BuildRight, Cross
      :           :- *(1) Project [d_year#20481L]
      :           :  +- *(1) Filter (((((isnotnull(d_year#20481L) && isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 2002))
      :           :     +- *(1) FileScan parquet [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), Grea..., ReadSchema: struct<d_year:bigint,d_fy_year:bigint,d_day_name:string>
      :           +- BroadcastExchange IdentityBroadcastMode
      :              +- *(2) Project [d_month_seq#21456L]
      :                 +- *(2) Filter (((isnotnull(d_day_name#21467) && isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && (d_fy_year#21464L > 2000))
      :                    +- *(2) FileScan parquet [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., ReadSchema: struct<d_month_seq:bigint,d_fy_year:bigint,d_day_name:string>
      +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], functions=[])
         +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
 

DataSourceV2:
{code:java}
== Physical Plan ==
 TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, output=prev_year#22320L,year#22321L,d_month_seq#22325L)
 +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS year#22321L, d_month_seq#22325L
 +- CartesianProduct
 :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
 : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- BroadcastNestedLoopJoin BuildRight, Cross
 : :- *(1) Project d_year#21696L
 : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: [isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
 : +- BroadcastExchange IdentityBroadcastMode
 : +- *(2) Project d_month_seq#22325L
 : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: [isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
 +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
 +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200)
 +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
 +- BroadcastNestedLoopJoin BuildRight, Cross
 :- *(5) Project d_year#22356L
 : +- *(5) ScanV2 BigQueryDataSourceV2d_year#22356L (Filters: [isnotnull(d_day_name#22364), (d_day_name#22364 = Monday), isnotnull(d_fy_year#22361L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
 +- BroadcastExchange IdentityBroadcastMode
 +- *(6) Project d_month_seq#22409L
 +- *(6) ScanV2 BigQueryDataSourceV2d_month_seq#22409L (Filters: [isnotnull(d_day_name#22420), (d_day_name#22420 = Monday), isnotnull(d_fy_year#22417L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]){code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org