You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2022/06/08 20:31:35 UTC

[spark] branch branch-3.3 updated: [SPARK-39393][SQL] Parquet data source only supports push-down predicate filters for non-repeated primitive types

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5847014fc3f [SPARK-39393][SQL] Parquet data source only supports push-down predicate filters for non-repeated primitive types
5847014fc3f is described below

commit 5847014fc3fe08b8a59c107a99c1540fbb2c2208
Author: Amin Borjian <bo...@outlook.com>
AuthorDate: Wed Jun 8 13:30:44 2022 -0700

    [SPARK-39393][SQL] Parquet data source only supports push-down predicate filters for non-repeated primitive types
    
    ### What changes were proposed in this pull request?
    
    In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
    These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.
    
    This PR solves this problem until the appropriate functionality is provided by the parquet.
    
    Before this PR:
    
    Assume follow Protocol buffer schema:
    
    ```
    message Model {
        string name = 1;
        repeated string keywords = 2;
    }
    ```
    
    Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
    Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:
    
    ```
    val data = spark.read.parquet("/path/to/parquet")
    data.registerTempTable("models")
    spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
    ```
    
    ```
    Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
      at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
      at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
      at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
      at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
      at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
      at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
      at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
      at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
      at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
      at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
      at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
      at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
      at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
      at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
      at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
      at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
      at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    ...
    ```
    
    The cause of the problem is due to a change in the data filtering conditions:
    
    ```
    spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);
    
    // Spark 3.0.2 and older
    == Physical Plan ==
    ...
    +- FileScan parquet [link#0,keywords#1]
      DataFilters: [array_contains(keywords#1, Google)]
      PushedFilters: []
      ...
    
    // Spark 3.1.0 and newer
    == Physical Plan == ...
    +- FileScan parquet [link#0,keywords#1]
      DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
      PushedFilters: [IsNotNull(keywords)]
      ...
    ```
    
    Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.
    
    ### Why are the changes needed?
    
    Predicate filters that are pushed down to parquet should not be created on repeated-type fields.
    
    ### Does this PR introduce any user-facing change?
    
    No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.
    
    ### How was this patch tested?
    
    Add an extra test to ensure problem solved.
    
    Closes #36781 from Borjianamin98/master.
    
    Authored-by: Amin Borjian <bo...@outlook.com>
    Signed-off-by: huaxingao <hu...@apple.com>
    (cherry picked from commit ac2881a8c3cfb196722a5680a62ebd6bb9fba728)
    Signed-off-by: huaxingao <hu...@apple.com>
---
 .../datasources/parquet/ParquetFilters.scala       |  6 ++++-
 .../datasources/parquet/ParquetFilterSuite.scala   | 29 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 75060cfca24..9502ec0316c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -33,6 +33,7 @@ import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType,
 import org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, TimeUnit}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
+import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros, RebaseSpec}
@@ -64,7 +65,10 @@ class ParquetFilters(
         fields: Seq[Type],
         parentFieldNames: Array[String] = Array.empty): Seq[ParquetPrimitiveField] = {
       fields.flatMap {
-        case p: PrimitiveType =>
+        // Parquet only supports predicate push-down for non-repeated primitive types.
+        // TODO(SPARK-39393): Remove extra condition when parquet added filter predicate support for
+        //                    repeated columns (https://issues.apache.org/jira/browse/PARQUET-34)
+        case p: PrimitiveType if p.getRepetition != Repetition.REPEATED =>
           Some(ParquetPrimitiveField(fieldNames = parentFieldNames :+ p.getName,
             fieldType = ParquetSchemaType(p.getLogicalTypeAnnotation,
               p.getPrimitiveTypeName, p.getTypeLength)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7a09011f27c..92798d4f4ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.io.File
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
@@ -1297,6 +1298,34 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
     }
   }
 
+  test("SPARK-39393: Do not push down predicate filters for repeated primitive fields") {
+    import ParquetCompatibilityTest._
+    withTempDir { dir =>
+      val protobufParquetFilePath = new File(dir, "protobuf-parquet").getCanonicalPath
+
+      val protobufSchema =
+        """message protobuf_style {
+          |  repeated int32 f;
+          |}
+        """.stripMargin
+
+      writeDirect(protobufParquetFilePath, protobufSchema, { rc =>
+        rc.message {
+          rc.field("f", 0) {
+              rc.addInteger(1)
+              rc.addInteger(2)
+          }
+        }
+      })
+
+      // If the "isnotnull(f)" filter gets pushed down, this query will throw an exception
+      // since column "f" is repeated primitive column in the Parquet file.
+      checkAnswer(
+        spark.read.parquet(dir.getCanonicalPath).filter("isnotnull(f)"),
+        Seq(Row(Seq(1, 2))))
+    }
+  }
+
   test("Filters should be pushed down for vectorized Parquet reader at row group level") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org