You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marco Gaido (JIRA)" <ji...@apache.org> on 2019/02/15 16:21:00 UTC

[jira] [Commented] (SPARK-26893) Allow pushdown of partition pruning subquery filters to file source

    [ https://issues.apache.org/jira/browse/SPARK-26893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769474#comment-16769474 ] 

Marco Gaido commented on SPARK-26893:
-------------------------------------

Actually this was done intentionally in order to avoid subquery re-execution (please notice that subqueries can be hard to compute, so the resource wastage can be significant). Please see https://github.com/apache/spark/pull/22518 for more details. In case you do want to do so, we'd need to fix that problem in a similar way to what was initially proposed in the before-mentioned PR. cc [~cloud_fan]

> Allow pushdown of partition pruning subquery filters to file source
> -------------------------------------------------------------------
>
>                 Key: SPARK-26893
>                 URL: https://issues.apache.org/jira/browse/SPARK-26893
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Peter Toth
>            Priority: Minor
>
> File source doesn't use subquery filters for partition pruning. But it could use those filters with a minor improvement.
> This query is an example:
> {noformat}
> CREATE TABLE a (id INT, p INT) USING PARQUET PARTITIONED BY (p)
> CREATE TABLE b (id INT) USING PARQUET
> SELECT * FROM a WHERE p <= (SELECT MIN(id) FROM b){noformat}
> Where the executed plan of the SELECT currently is:
> {noformat}
> *(1) Filter (p#252L <= Subquery subquery250)
> : +- Subquery subquery250
> : +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
> : +- Exchange SinglePartition
> : +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], output=[min#259L])
> : +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> +- *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0, file:..., PartitionCount: 2, PartitionFilters: [isnotnull(p#252L)], PushedFilters: [], ReadSchema: struct<id:bigint>
> {noformat}
> But it could be: 
> {noformat}
> *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0, file:..., PartitionFilters: [isnotnull(p#252L), (p#252L <= Subquery subquery250)], PushedFilters: [], ReadSchema: struct<id:bigint>
> +- Subquery subquery250
> +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], output=[min#259L])
> +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {noformat}
> and so partition pruning could work in {{FileSourceScanExec}}.
>  Please note that {{PartitionCount}} metadata can't be computed before execution so in this case it is no longer part of the plan.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org