You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/09/16 17:51:06 UTC

[spark] branch branch-3.2 updated: [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new e90a57e1aa5 [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema
e90a57e1aa5 is described below

commit e90a57e1aa5afdb5e9f04f2ddcb34916c2939b2e
Author: Chao Sun <su...@apple.com>
AuthorDate: Fri Sep 16 10:46:36 2022 -0700

    [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema
    
    ### What changes were proposed in this pull request?
    
    Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).
    
    The root cause, it seems, is because in the V1 path, we first use `AttributeReference` equality to filter out data columns without partition columns, and then use `AttributeSet` equality to filter out filters with only references to data columns.
    There's inconsistency in the two steps, when case sensitive check is false.
    
    Take the following scenario as example:
    - data column: `[COL, a]`
    - partition column: `[col]`
    - filter: `col > 10`
    
    With `AttributeReference` equality, `COL` is not considered equal to `col` (because their names are different), and thus the filtered out data column set is still `[COL, a]`. However, when calculating filters with only reference to data columns, `COL` is **considered equal** to `col`. Consequently, the filter `col > 10`, when checking with `[COL, a]`, is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.
    
    On the Parquet side, since `col` doesn't exist in the file schema (it only has `COL`), when column index enabled, it will incorrectly return wrong number of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) for more detail.
    
    In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.
    
    ### Why are the changes needed?
    
    This fixes the correctness bug described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    There are existing test cases for this issue from [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also modified them to test the scenarios when case sensitivity is on or off.
    
    Closes #37881 from sunchao/SPARK-40169.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../execution/datasources/FileSourceStrategy.scala |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala    |  5 ---
 .../datasources/parquet/ParquetQuerySuite.scala    | 38 ++++++++++++++--------
 3 files changed, 25 insertions(+), 20 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 1bfde7515dc..50a6519604f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -183,7 +183,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
 
       // Partition keys are not available in the statistics of the files.
       // `dataColumns` might have partition columns, we need to filter them out.
-      val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
+      val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionSet.contains)
       val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
         if (f.references.intersect(partitionSet).nonEmpty) {
           extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b3922d11a4..e5d33b84bf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -228,11 +228,6 @@ class ParquetFileFormat
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
-    // See PARQUET-2170.
-    // Disable column index optimisation when required schema does not have columns that appear in
-    // pushed filters to avoid getting incorrect results.
-    hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false)
-
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 47096948d21..1490e14a55a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -935,24 +935,34 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
   }
 
   test("SPARK-39833: pushed filters with count()") {
-    withTempPath { path =>
-      val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
-      Seq(0).toDF("COL").coalesce(1).write.save(p)
-      val df = spark.read.parquet(path.getCanonicalPath)
-      checkAnswer(df.filter("col = 0"), Seq(Row(0)))
-      assert(df.filter("col = 0").count() == 1, "col")
-      assert(df.filter("COL = 0").count() == 1, "COL")
+    Seq(true, false).foreach { caseSensitive =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        withTempPath { path =>
+          val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+          Seq(0).toDF("COL").coalesce(1).write.save(p)
+          val df = spark.read.parquet(path.getCanonicalPath)
+          val expected = if (caseSensitive) Seq(Row(0, 0)) else Seq(Row(0))
+          checkAnswer(df.filter("col = 0"), expected)
+          assert(df.filter("col = 0").count() == 1, "col")
+          assert(df.filter("COL = 0").count() == 1, "COL")
+        }
+      }
     }
   }
 
   test("SPARK-39833: pushed filters with project without filter columns") {
-    withTempPath { path =>
-      val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
-      Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
-      val df = spark.read.parquet(path.getCanonicalPath)
-      checkAnswer(df.filter("col = 0"), Seq(Row(0, 1)))
-      assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
-      assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil)
+    Seq(true, false).foreach { caseSensitive =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        withTempPath { path =>
+          val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+          Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
+          val df = spark.read.parquet(path.getCanonicalPath)
+          val expected = if (caseSensitive) Seq(Row(0, 1, 0)) else Seq(Row(0, 1))
+          checkAnswer(df.filter("col = 0"), expected)
+          assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
+          assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil)
+        }
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org