You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mayur Bhosale (Jira)" <ji...@apache.org> on 2020/01/22 06:12:00 UTC

[jira] [Comment Edited] (SPARK-30528) DPP issues

    [ https://issues.apache.org/jira/browse/SPARK-30528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020803#comment-17020803 ] 

Mayur Bhosale edited comment on SPARK-30528 at 1/22/20 6:11 AM:
----------------------------------------------------------------

Thanks for the explanation [~maryannxue]
 # Should DPP be turned off by default till the heuristics are improved or keep having it turned on by default but don't do DPP when the column level stats are not available? Because for some cases this can be really disastrous.
 # Can we use the bloom filter to store the pruning values (for non-Broadcast Hash Join)? This will have multiple advantages -
 ## The size of the result returned to the driver would be way smaller
 ## Faster lookups compared to hashSet
 ## Reuse of the exchange will happen (because we won't be adding Aggregate on top)
 ## Duplicate subqueries because of multiple join conditions on partitioned columns will get removed (cases like example 3 in the description above)

          With Bloom Filter, DPP subquery should look something like this - 

            | Bloom Filter |<------------| Other operations |<----------| Exchange |<------| Scan |

         This will require more thoughts though. Let me know if this sounds feasible and useful, then I can get back with more details and can pick it up as well. 

       3. Yes, one of the subqueries selects `col1` and the other selects `col2`.
{code:java}
== Physical Plan ==                                                             
*(5) SortMergeJoin [partcol1#2L, partcol2#3], [col1#5L, col2#6], Inner
:- *(2) Sort [partcol1#2L ASC NULLS FIRST, partcol2#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(partcol1#2L, partcol2#3, 200), true, [id=#103]
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.partitionedtable[id#0L,name#1,partCol1#2L,partCol2#3] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/sp..., PartitionFilters: [isnotnull(partCol2#3), isnotnull(partCol1#2L), dynamicpruningexpression(partCol1#2L IN subquery#..., PushedFilters: [], ReadSchema: struct<id:bigint,name:string>
:              :- Subquery subquery#19, [id=#49]
:              :  +- *(2) HashAggregate(keys=[col1#5L], functions=[])
:              :     +- Exchange hashpartitioning(col1#5L, 200), true, [id=#45]
:              :        +- *(1) HashAggregate(keys=[col1#5L], functions=[])
:              :           +- *(1) Project [col1#5L]
:              :              +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND isnotnull(col1#5L))
:              :                 +- *(1) ColumnarToRow
:              :                    +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<id:bigint,col1:bigint,col2:string>
:              +- Subquery subquery#21, [id=#82]
:                 +- *(2) HashAggregate(keys=[col2#6], functions=[])
:                    +- Exchange hashpartitioning(col2#6, 200), true, [id=#78]
:                       +- *(1) HashAggregate(keys=[col2#6], functions=[])
:                          +- *(1) Project [col2#6]
:                             +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND isnotnull(col1#5L))
:                                +- *(1) ColumnarToRow
:                                   +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<id:bigint,col1:bigint,col2:string>
+- *(4) Sort [col1#5L ASC NULLS FIRST, col2#6 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col1#5L, col2#6, 200), true, [id=#113]
      +- *(3) Project [id#4L, col1#5L, col2#6, name#7]
         +- *(3) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND isnotnull(col1#5L))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6,name#7] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<id:bigint,col1:bigint,col2:string,name:string> 
{code}
 

           If we don't decide to go with removing Aggregate (not using BloomFilter), should we combine such DPP subqueries into a                     single sub-query? We can avoid duplicate computation this way.


was (Author: mayurb31):
Thanks for the explanation [~maryannxue]
 # Should DPP be turned off by default till the heuristics are improved or keep having it turned on by default but don't do DPP when the column level stats are not available? Because for some cases this can be really disastrous.
 # Can we use the bloom filter to store the pruning values (for non-Broadcast Hash Join)? This will have multiple advantages -
 ## The size of the result returned to the driver would be way smaller
 ## Faster lookups compared to hashSet
 ## Reuse of the exchange will happen (because we won't be adding Aggregate on top)
 ## Duplicate subqueries because of multiple join conditions on partitioned columns will get removed (cases like example 3 in the description above)

          With Bloom Filter, DPP subquery should look something like this - 

          +------------+                +------------------+            +-----------+       +------+
          | Bloom filter  |<-------- | Other operations  | <------| Exchange  |<---| Scan  |
          +------------+                +------------------+            +-----------+       +------+

         This will require more thoughts though. Let me know if this sounds feasible and useful, then I can get back with more details and can pick it up as well. 

       3. Yes, one of the subqueries selects `col1` and the other selects `col2`.
{code:java}
== Physical Plan ==                                                             
*(5) SortMergeJoin [partcol1#2L, partcol2#3], [col1#5L, col2#6], Inner
:- *(2) Sort [partcol1#2L ASC NULLS FIRST, partcol2#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(partcol1#2L, partcol2#3, 200), true, [id=#103]
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.partitionedtable[id#0L,name#1,partCol1#2L,partCol2#3] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/sp..., PartitionFilters: [isnotnull(partCol2#3), isnotnull(partCol1#2L), dynamicpruningexpression(partCol1#2L IN subquery#..., PushedFilters: [], ReadSchema: struct<id:bigint,name:string>
:              :- Subquery subquery#19, [id=#49]
:              :  +- *(2) HashAggregate(keys=[col1#5L], functions=[])
:              :     +- Exchange hashpartitioning(col1#5L, 200), true, [id=#45]
:              :        +- *(1) HashAggregate(keys=[col1#5L], functions=[])
:              :           +- *(1) Project [col1#5L]
:              :              +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND isnotnull(col1#5L))
:              :                 +- *(1) ColumnarToRow
:              :                    +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<id:bigint,col1:bigint,col2:string>
:              +- Subquery subquery#21, [id=#82]
:                 +- *(2) HashAggregate(keys=[col2#6], functions=[])
:                    +- Exchange hashpartitioning(col2#6, 200), true, [id=#78]
:                       +- *(1) HashAggregate(keys=[col2#6], functions=[])
:                          +- *(1) Project [col2#6]
:                             +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND isnotnull(col1#5L))
:                                +- *(1) ColumnarToRow
:                                   +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<id:bigint,col1:bigint,col2:string>
+- *(4) Sort [col1#5L ASC NULLS FIRST, col2#6 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col1#5L, col2#6, 200), true, [id=#113]
      +- *(3) Project [id#4L, col1#5L, col2#6, name#7]
         +- *(3) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND isnotnull(col1#5L))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6,name#7] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema: struct<id:bigint,col1:bigint,col2:string,name:string> 
{code}
 

           If we don't decide to go with removing Aggregate (not using BloomFilter), should we combine such DPP subqueries into a                     single sub-query? We can avoid duplicate computation this way.

> DPP issues
> ----------
>
>                 Key: SPARK-30528
>                 URL: https://issues.apache.org/jira/browse/SPARK-30528
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.0.0
>            Reporter: Mayur Bhosale
>            Priority: Major
>              Labels: performance
>         Attachments: cases.png, dup_subquery.png, plan.png
>
>
> In DPP, heuristics to decide if DPP is going to benefit relies on the sizes of the tables in the right subtree of the join. This might not be a correct estimate especially when the detailed column level stats are not available.
> {code:java}
>     // the pruning overhead is the total size in bytes of all scan relations
>     val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat
>     filterRatio * partPlan.stats.sizeInBytes.toFloat > overhead.toFloat
> {code}
> Also, DPP executes the entire right side of the join as a subquery because of which multiple scans happen for the tables in the right subtree of the join. This can cause issues when join is non-Broadcast Hash Join (BHJ) and reuse of the subquery result does not happen. Also, I couldn’t figure out, why do the results from the subquery get re-used only for BHJ?
>  
> Consider a query,
> {code:java}
> SELECT * 
> FROM   store_sales_partitioned 
>        JOIN (SELECT * 
>              FROM   store_returns_partitioned, 
>                     date_dim 
>              WHERE  sr_returned_date_sk = d_date_sk) ret_date 
>          ON ss_sold_date_sk = d_date_sk 
> WHERE  d_fy_quarter_seq > 0 
> {code}
> DPP will kick-in for both the join. (Please check the image plan.png attached below for the plan)
> Some of the observations -
>  * Based on heuristics, DPP would go ahead with pruning if the cost of scanning the tables in the right sub-tree of the join is less than the benefit due to pruning. This is due to the reason that multiple scans will be needed for an SMJ. But heuristics simply checks if the benefits offset the cost of multiple scans and do not take into consideration other operations like Join, etc in the right subtree which can be quite expensive. This issue will be particularly prominent when detailed column level stats are not available. In the example above, a decision that pruningHasBenefit was made on the basis of sizes of the tables store_returns_partitioned and date_dim but did not take into consideration the join between them before the join happens with the store_sales_partitioned table.
>  * Multiple scans are needed when the join is SMJ as the reuse of the exchanges does not happen. This is because Aggregate gets added on top of the right subtree to be executed as a subquery in order to prune only required columns. Here, scanning all the columns as the right subtree of the join would, and reusing the same exchange might be more helpful as it avoids duplicate scans.
> This was just a representative example, but in-general for cases such as in the image cases.png below, DPP can cause performance issues.
>  
> Also, for the cases when there are multiple DPP compatible join conditions in the same join, the entire right subtree of the join would be executed as a subquery that many times. Consider an example,
> {code:java}
> SELECT * 
> FROM   partitionedtable 
>        JOIN nonpartitionedtable 
>          ON partcol1 = col1 
>             AND partcol2 = col2 
> WHERE  nonpartitionedtable.id > 0 
> {code}
> Here the right subtree of the join (scan of table nonpartitionedtable) would be executed twice as a subquery, once each for the every join condition. These two subqueries should be aggregated and executed only once as they are almost the same apart from the columns that they prune. Check the image dup_subquery.png attached below for the details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org