You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/06/21 05:47:00 UTC

[jira] [Assigned] (SPARK-32041) Exchange reuse won't work in cases when DPP, subqueries are involved

     [ https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-32041:
------------------------------------

    Assignee:     (was: Apache Spark)

> Exchange reuse won't work in cases when DPP, subqueries are involved
> --------------------------------------------------------------------
>
>                 Key: SPARK-32041
>                 URL: https://issues.apache.org/jira/browse/SPARK-32041
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.6, 3.0.0
>            Reporter: Prakhar Jain
>            Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and if that exchange has some some DPP Subquery filter, then ReuseExchange doesn't work for such Exchange and different stages are launched to compute same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
>   .write
>   .partitionBy("store_id")
>   .format("parquet")
>   .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
>   (1, "AU", "US"),
>   (2, "CA", "US"),
>   (3, "KA", "IN"),
>   (4, "DL", "IN"),
>   (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
>   .write
>   .format("parquet")
>   .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
>     With view1 as (
>       SELECT product_id, f.store_id
>       FROM fact_stats f JOIN dim_stats
>       ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
>     SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
>  *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
>  :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, 0
>  : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
>  : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], Inner, BuildRight
>  : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : : +- *(2) Filter isnotnull(product_id#1968)
>  : : +- *(2) ColumnarToRow
>  : : +- FileScan parquet default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: Parquet, Location: InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql..., PartitionFilters: [isnotnull(store_id#1970), dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int>
>  : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], [id=#1131|#1131]
>  : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#1021|#1021]
>  : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#1021|#1021]
>  : +- *(1) Project [store_id#1971|#1971]
>  : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND isnotnull(store_id#1971))
>  : +- *(1) ColumnarToRow
>  : +- FileScan parquet default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), isnotnull(store_id#1971)|#1973), (country#1973 = IN), isnotnull(store_id#1971)], Format: Parquet, Location: InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql..., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: struct<store_id:int,country:string>
>  +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, 0
>  +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
>  Note the last line of plan. Its a ReusedExchange which is pointing to id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange node is pointing to incorrect Child node (1026 instead of 1140) and so in actual, exchange reuse won't happen in this query.
> Another query where issue is because of ReuseSubquery:
> {noformat}
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
> val query1 = """
>                   | With view1 as (
>                   |   SELECT product_id, units_sold
>                   |   FROM fact_stats
>                   |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
>                   |         and units_sold = 2
>                   | ), view2 as (
>                   |   SELECT product_id, units_sold
>                   |   FROM fact_stats
>                   |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
>                   |         and units_sold = 1
>                   | )
>                   |
>                   | SELECT *
>                   | FROM view1 v1 join view2 v2 join view2 v3
>                   | WHERE v1.product_id = v2.product_id and v2.product_id = v3.product_id
> """
> // Here we are joining v2 with self. So it should use ReuseExchange. But final plan computes v2 twice.
> val df = spark.sql(query1);
> println(df.queryExecution.executedPlan){noformat}
> Here we are joining v2 with self. So it should use ReuseExchange. But final plan computes v2 twice.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org