You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Su (Jira)" <ji...@apache.org> on 2020/09/11 18:36:00 UTC

[jira] [Updated] (SPARK-32859) Introduce SQL physical plan rule to decide enable/disable bucketing

     [ https://issues.apache.org/jira/browse/SPARK-32859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cheng Su updated SPARK-32859:
-----------------------------
    Description: 
Discussed with [~cloud_fan] offline, it would be better that we can decide enable/disable SQL bucketing automatically according to query plan. Currently bucketing is enabled by default (`spark.sql.sources.bucketing.enabled`=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism.

 

The proposed change is to introduce a physical plan rule (right before `ensureRequirements`):

(1).transformUp() physical plan, matching SparkPlan operator which is FileSourceScanExec, if optionalBucketSet is set, enabling bucket scan (bucket filter in this case).

(2).transformUp() physical plan, matching SparkPlan operator which is SparkPlanWithInterestingPartitioning.

SparkPlanWithInterestingPartitioning: the plan is in \{SortMergeJoinExec, ShuffledHashJoinExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec, etc, which has HashClusteredDistribution/ClusteredDistribution in requiredChildDistribution}, and its requiredChildDistribution HashClusteredDistribution/ClusteredDistribution on its underlying FileSourceScanExec's bucketed columns.

(3).for any child of SparkPlanWithInterestingPartitioning, which does not satisfy the plan's requiredChildDistribution: go though the child's sub query plan tree.
 if (3.1).all node's outputPartitioning is same as child, and all node's requiredChildDistribution is UnspecifiedDistribution.
 and (3.2).the leaf node is FileSourceScanExec on bucketed table and
 and (3.3).if enabling bucket scan for this FileSourceScanExec, the outputPartitioning of FileSourceScanExec satisfies requiredChildDistribution of SparkPlanWithInterestingPartitioning.
 If (3.1),(3.2),(3.3) are all true, enabling bucket scan for this FileSourceScanExec. And double check the new child of SparkPlanWithInterestingPartitioning satisfies requiredChildDistribution.

 

The idea of SparkPlanWithInterestingPartitioning, is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"([http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf]).

  was:
Discussed with [~cloud_fan] offline, it would be better that we can decide enable/disable SQL bucketing automatically according to query plan. Currently bucketing is enabled by default (`spark.sql.sources.bucketing.enabled`=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism.

The proposed change is to introduce a physical plan rule (right before `ensureRequirements`).

 

(1).transformUp() physical plan, matching SparkPlan operator which is FileSourceScanExec, if optionalBucketSet is set, enabling bucket scan (bucket filter in this case).

(2).transformUp() physical plan, matching SparkPlan operator which is SparkPlanWithInterestingPartitioning.

SparkPlanWithInterestingPartitioning: the plan is in \{SortMergeJoinExec, ShuffledHashJoinExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec, etc, which has HashClusteredDistribution/ClusteredDistribution in requiredChildDistribution}, and its requiredChildDistribution HashClusteredDistribution/ClusteredDistribution on its underlying FileSourceScanExec's bucketed columns.

(3).for any child of SparkPlanWithInterestingPartitioning, which does not satisfy the plan's requiredChildDistribution: go though the child's sub query plan tree.
 if (3.1).all node's outputPartitioning is same as child, and all node's requiredChildDistribution is UnspecifiedDistribution.
 and (3.2).the leaf node is FileSourceScanExec on bucketed table and
 and (3.3).if enabling bucket scan for this FileSourceScanExec, the outputPartitioning of FileSourceScanExec satisfies requiredChildDistribution of SparkPlanWithInterestingPartitioning.
 If (3.1),(3.2),(3.3) are all true, enabling bucket scan for this FileSourceScanExec. And double check the new child of SparkPlanWithInterestingPartitioning satisfies requiredChildDistribution.

 

The idea of SparkPlanWithInterestingPartitioning, is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"(http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf).


> Introduce SQL physical plan rule to decide enable/disable bucketing 
> --------------------------------------------------------------------
>
>                 Key: SPARK-32859
>                 URL: https://issues.apache.org/jira/browse/SPARK-32859
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Minor
>
> Discussed with [~cloud_fan] offline, it would be better that we can decide enable/disable SQL bucketing automatically according to query plan. Currently bucketing is enabled by default (`spark.sql.sources.bucketing.enabled`=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism.
>  
> The proposed change is to introduce a physical plan rule (right before `ensureRequirements`):
> (1).transformUp() physical plan, matching SparkPlan operator which is FileSourceScanExec, if optionalBucketSet is set, enabling bucket scan (bucket filter in this case).
> (2).transformUp() physical plan, matching SparkPlan operator which is SparkPlanWithInterestingPartitioning.
> SparkPlanWithInterestingPartitioning: the plan is in \{SortMergeJoinExec, ShuffledHashJoinExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec, etc, which has HashClusteredDistribution/ClusteredDistribution in requiredChildDistribution}, and its requiredChildDistribution HashClusteredDistribution/ClusteredDistribution on its underlying FileSourceScanExec's bucketed columns.
> (3).for any child of SparkPlanWithInterestingPartitioning, which does not satisfy the plan's requiredChildDistribution: go though the child's sub query plan tree.
>  if (3.1).all node's outputPartitioning is same as child, and all node's requiredChildDistribution is UnspecifiedDistribution.
>  and (3.2).the leaf node is FileSourceScanExec on bucketed table and
>  and (3.3).if enabling bucket scan for this FileSourceScanExec, the outputPartitioning of FileSourceScanExec satisfies requiredChildDistribution of SparkPlanWithInterestingPartitioning.
>  If (3.1),(3.2),(3.3) are all true, enabling bucket scan for this FileSourceScanExec. And double check the new child of SparkPlanWithInterestingPartitioning satisfies requiredChildDistribution.
>  
> The idea of SparkPlanWithInterestingPartitioning, is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"([http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf]).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org