You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2019/03/04 05:43:00 UTC

[jira] [Resolved] (SPARK-26893) Allow partition pruning with subquery filters on file source

     [ https://issues.apache.org/jira/browse/SPARK-26893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-26893.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 23802
[https://github.com/apache/spark/pull/23802]

> Allow partition pruning with subquery filters on file source
> ------------------------------------------------------------
>
>                 Key: SPARK-26893
>                 URL: https://issues.apache.org/jira/browse/SPARK-26893
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Peter Toth
>            Assignee: Peter Toth
>            Priority: Minor
>             Fix For: 3.0.0
>
>
> File source doesn't use subquery filters for partition pruning. But it could use those filters with a minor improvement.
> This query is an example:
> {noformat}
> CREATE TABLE a (id INT, p INT) USING PARQUET PARTITIONED BY (p)
> CREATE TABLE b (id INT) USING PARQUET
> SELECT * FROM a WHERE p <= (SELECT MIN(id) FROM b){noformat}
> Where the executed plan of the SELECT currently is:
> {noformat}
> *(1) Filter (p#252L <= Subquery subquery250)
> : +- Subquery subquery250
> : +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
> : +- Exchange SinglePartition
> : +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], output=[min#259L])
> : +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> +- *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0, file:..., PartitionCount: 2, PartitionFilters: [isnotnull(p#252L)], PushedFilters: [], ReadSchema: struct<id:bigint>
> {noformat}
> But it could be: 
> {noformat}
> *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0, file:..., PartitionFilters: [isnotnull(p#252L), (p#252L <= Subquery subquery250)], PushedFilters: [], ReadSchema: struct<id:bigint>
> +- Subquery subquery250
> +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], output=[min#259L])
> +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {noformat}
> and so partition pruning could work in {{FileSourceScanExec}}.
>  Please note that {{PartitionCount}} metadata can't be computed before execution so in this case it is no longer part of the plan.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org