You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amin Borjian (Jira)" <ji...@apache.org> on 2022/06/06 19:50:00 UTC

[jira] [Created] (SPARK-39393) Parquet data source only supports push-down predicate filters for non-repeated primitive types

Amin Borjian created SPARK-39393:
------------------------------------

             Summary: Parquet data source only supports push-down predicate filters for non-repeated primitive types
                 Key: SPARK-39393
                 URL: https://issues.apache.org/jira/browse/SPARK-39393
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.1, 3.2.0, 3.1.2, 3.1.1, 3.1.0
            Reporter: Amin Borjian


I use an example to illustrate the problem. The reason for the problem and the problem-solving approach are stated below. 

Assume follow Protocol buffer schema:

 
{code:java}
message Model {
     string name = 1;
     repeated string keywords = 2;
}
{code}
Suppose a parquet file is created from a set of records in the above format with the help of the {{parquet-protobuf}} library.

Using Spark version 3.0.2 or older, we could run the following query using {{{}spark-shell{}}}:

 
{code:java}
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
{code}
 

But after updating Spark, we get the following error:

 
{code:java}
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
{code}
 

At first it seems the problem is the parquet library. But in fact, our problem is because of this line that has been around since 2014 (based on Git history):

[Parquet Schema Compatibility Validator|[https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java#L194]]

After some check, I notice that the cause of the problem is due to a change in the data filtering conditions:

 
{code:java}
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
... 
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ... 
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...{code}
It's good that the filtering section has become smarter. Unfortunately, due to unfamiliarity with code base, I could not find the exact location of the change and related pull request. In general, this change is suitable for non-repeated parquet fields, but in the repeated field, it causes an error from the parquet library. (Like the example given)

The only temporary solution in my opinion to solve the problem is to disable the following setting, which in general greatly reduces performance:
{code:java}
SET spark.sql.parquet.filterPushdown=false {code}
I created a patch for this bug and a pull request will be sent soon.

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org