You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jackey Lee (Jira)" <ji...@apache.org> on 2022/02/27 13:56:00 UTC

[jira] [Updated] (SPARK-38041) DataFilter pushed down with PartitionFilter

     [ https://issues.apache.org/jira/browse/SPARK-38041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jackey Lee updated SPARK-38041:
-------------------------------
    Description: 
At present, the Filter is divided into DataFilter and PartitionFilter when it is pushed down, but when the Filter removes the PartitionFilter, it means that all Partitions will scan all DataFilter conditions, which may cause full data scan.

Here is a example.

before
{code:java}
== Physical Plan ==
*(1) Filter (((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))
+- *(1) ColumnarToRow
   +- BatchScan[a#40L, b#41L, c#42, d#43] ParquetScan DataFilters: [(((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))], Format: parquet, Location: InMemoryFileIndex(1 paths)[path, PartitionFilters: [((c#42 = 0) OR ((c#42 >= 1) AND (c#42 < 3)))], PushedAggregation: [], PushedFilters: [Or(LessThan(a,10),GreaterThanOrEqual(a,10))], PushedGroupBy: [], ReadSchema: struct<a:bigint,b:bigint>, PushedFilters: [Or(LessThan(a,10),GreaterThanOrEqual(a,10))], PushedAggregation: [], PushedGroupBy: [] RuntimeFilters: []
{code}
after
{code:java}
== Physical Plan ==
*(1) Filter (((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))
+- *(1) ColumnarToRow
   +- BatchScan[a#40L, b#41L, c#42, d#43] ParquetScan DataFilters: [(((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))], Format: parquet, Location: InMemoryFileIndex(1 paths)[path, PartitionFilters: [((c#42 = 0) OR ((c#42 >= 1) AND (c#42 < 3)))], PushedAggregation: [], PushedFilters: [Or(And(LessThan(a,10),EqualTo(c,0)),And(And(GreaterThanOrEqual(a,10),GreaterThanOrEqual(c,1)),Le..., PushedGroupBy: [], ReadSchema: struct<a:bigint,b:bigint>, PushedFilters: [Or(And(LessThan(a,10),EqualTo(c,0)),And(And(GreaterThanOrEqual(a,10),GreaterThanOrEqual(c,1)),LessThan(c,3)))], PushedAggregation: [], PushedGroupBy: [] RuntimeFilters: [] {code}

  was:
At present, the Filter is divided into DataFilter and PartitionFilter when it is pushed down, but when the Filter removes the PartitionFilter, it means that all Partitions will scan all DataFilter conditions, which may cause full data scan.

Here is a example.

before
{code:java}
== Physical Plan ==
*(1) Filter (((a#0 < 10) AND (c#2 = 0)) OR (((a#0 >= 10) AND (c#2 >= 1)) AND (c#2 < 3)))
+- *(1) ColumnarToRow
   +- FileScan parquet datasources.test_push_down[a#0,b#1,c#2] Batched: true, DataFilters: [((a#0 < 10) OR (a#0 >= 10))], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [((c#2 = 0) OR ((c#2 >= 1) AND (c#2 < 3)))], PushedFilters: [Or(LessThan(a,10),GreaterThanOrEqual(a,10))], ReadSchema: struct<a:int,b:int> {code}
after
{code:java}
== Physical Plan ==
*(1) Filter (((a#0 < 10) AND (c#2 = 0)) OR (((a#0 >= 10) AND (c#2 >= 1)) AND (c#2 < 3)))
+- *(1) ColumnarToRow
   +- FileScan parquet datasources.test_push_down[a#0,b#1,c#2] Batched: true, DataFilters: [(((a#0 < 10) AND (c#2 = 0)) OR (((a#0 >= 10) AND (c#2 >= 1)) AND (c#2 < 3)))], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [((c#2 = 0) OR ((c#2 >= 1) AND (c#2 < 3)))], PushedFilters: [Or(LessThan(a,10),GreaterThanOrEqual(a,10))], ReadSchema: struct<a:int,b:int>  {code}


> DataFilter pushed down with PartitionFilter
> -------------------------------------------
>
>                 Key: SPARK-38041
>                 URL: https://issues.apache.org/jira/browse/SPARK-38041
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Jackey Lee
>            Priority: Major
>
> At present, the Filter is divided into DataFilter and PartitionFilter when it is pushed down, but when the Filter removes the PartitionFilter, it means that all Partitions will scan all DataFilter conditions, which may cause full data scan.
> Here is a example.
> before
> {code:java}
> == Physical Plan ==
> *(1) Filter (((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))
> +- *(1) ColumnarToRow
>    +- BatchScan[a#40L, b#41L, c#42, d#43] ParquetScan DataFilters: [(((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))], Format: parquet, Location: InMemoryFileIndex(1 paths)[path, PartitionFilters: [((c#42 = 0) OR ((c#42 >= 1) AND (c#42 < 3)))], PushedAggregation: [], PushedFilters: [Or(LessThan(a,10),GreaterThanOrEqual(a,10))], PushedGroupBy: [], ReadSchema: struct<a:bigint,b:bigint>, PushedFilters: [Or(LessThan(a,10),GreaterThanOrEqual(a,10))], PushedAggregation: [], PushedGroupBy: [] RuntimeFilters: []
> {code}
> after
> {code:java}
> == Physical Plan ==
> *(1) Filter (((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))
> +- *(1) ColumnarToRow
>    +- BatchScan[a#40L, b#41L, c#42, d#43] ParquetScan DataFilters: [(((a#40L < 10) AND (c#42 = 0)) OR (((a#40L >= 10) AND (c#42 >= 1)) AND (c#42 < 3)))], Format: parquet, Location: InMemoryFileIndex(1 paths)[path, PartitionFilters: [((c#42 = 0) OR ((c#42 >= 1) AND (c#42 < 3)))], PushedAggregation: [], PushedFilters: [Or(And(LessThan(a,10),EqualTo(c,0)),And(And(GreaterThanOrEqual(a,10),GreaterThanOrEqual(c,1)),Le..., PushedGroupBy: [], ReadSchema: struct<a:bigint,b:bigint>, PushedFilters: [Or(And(LessThan(a,10),EqualTo(c,0)),And(And(GreaterThanOrEqual(a,10),GreaterThanOrEqual(c,1)),LessThan(c,3)))], PushedAggregation: [], PushedGroupBy: [] RuntimeFilters: [] {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org