You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/06/06 21:13:00 UTC

[jira] [Assigned] (SPARK-39393) Parquet data source only supports push-down predicate filters for non-repeated primitive types

     [ https://issues.apache.org/jira/browse/SPARK-39393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-39393:
------------------------------------

    Assignee: Apache Spark

> Parquet data source only supports push-down predicate filters for non-repeated primitive types
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39393
>                 URL: https://issues.apache.org/jira/browse/SPARK-39393
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
>            Reporter: Amin Borjian
>            Assignee: Apache Spark
>            Priority: Major
>              Labels: parquet
>
> I use an example to illustrate the problem. The reason for the problem and the problem-solving approach are stated below.
> Assume follow Protocol buffer schema:
> {code:java}
> message Model {
>      string name = 1;
>      repeated string keywords = 2;
> }
> {code}
> Suppose a parquet file is created from a set of records in the above format with the help of the {{parquet-protobuf}} library.
> Using Spark version 3.0.2 or older, we could run the following query using {{{}spark-shell{}}}:
> {code:java}
> val data = spark.read.parquet("/path/to/parquet")
> data.registerTempTable("models")
> spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
> {code}
> But after updating Spark, we get the following error:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
>   at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
>   at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
>   at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
>   at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
>   at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
>   at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
>   at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
>   at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
>   at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
>   at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
>   at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
>   at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
>   at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
>   at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
>   at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>   at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
>   at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
> ...
> {code}
> At first it seems the problem is the parquet library. But in fact, our problem is because of this line that has been around since 2014 (based on Git history):
> [Parquet Schema Compatibility Validator|https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java#L194]
> After some check, I notice that the cause of the problem is due to a change in the data filtering conditions:
> {code:java}
> spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);
> // Spark 3.0.2 and older
> == Physical Plan ==
> ... 
> +- FileScan parquet [link#0,keywords#1]
>   DataFilters: [array_contains(keywords#1, Google)]
>   PushedFilters: []
>   ...
> // Spark 3.1.0 and newer
> == Physical Plan == ... 
> +- FileScan parquet [link#0,keywords#1]
>   DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
>   PushedFilters: [IsNotNull(keywords)]
>   ...{code}
> It's good that the filtering section has become smarter. Unfortunately, due to unfamiliarity with code base, I could not find the exact location of the change and related pull request. In general, this change is suitable for non-repeated parquet fields, but in the repeated field, it causes an error from the parquet library. (Like the example given)
> The only temporary solution in my opinion to solve the problem is to disable the following setting, which in general greatly reduces performance:
> {code:java}
> SET spark.sql.parquet.filterPushdown=false {code}
> I created a patch for this bug and a pull request will be sent soon.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org