You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/21 10:00:28 UTC

[spark] branch branch-3.2 updated: [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new e8a578ac757 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns
e8a578ac757 is described below

commit e8a578ac757b4e53072af1bec908f6a1ff8ba611
Author: Ivan Sadikov <iv...@databricks.com>
AuthorDate: Sun Aug 21 18:59:48 2022 +0900

    [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap.
    
    This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields.
    
    The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0.
    
    In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter.
    
    Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.
    
    The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`.
    
    ### Why are the changes needed?
    
    Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1.
    
    Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.
    
    Closes #37419 from sadikovi/SPARK-39833.
    
    Authored-by: Ivan Sadikov <iv...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit cde71aaf173aadd14dd6393b09e9851b5caad903)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  5 +++++
 .../datasources/parquet/ParquetQuerySuite.scala    | 22 ++++++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index e5d33b84bf0..6b3922d11a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -228,6 +228,11 @@ class ParquetFileFormat
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
+    // See PARQUET-2170.
+    // Disable column index optimisation when required schema does not have columns that appear in
+    // pushed filters to avoid getting incorrect results.
+    hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false)
+
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 9ef43995467..47096948d21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -933,6 +933,28 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
       }
     }
   }
+
+  test("SPARK-39833: pushed filters with count()") {
+    withTempPath { path =>
+      val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+      Seq(0).toDF("COL").coalesce(1).write.save(p)
+      val df = spark.read.parquet(path.getCanonicalPath)
+      checkAnswer(df.filter("col = 0"), Seq(Row(0)))
+      assert(df.filter("col = 0").count() == 1, "col")
+      assert(df.filter("COL = 0").count() == 1, "COL")
+    }
+  }
+
+  test("SPARK-39833: pushed filters with project without filter columns") {
+    withTempPath { path =>
+      val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+      Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
+      val df = spark.read.parquet(path.getCanonicalPath)
+      checkAnswer(df.filter("col = 0"), Seq(Row(0, 1)))
+      assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
+      assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil)
+    }
+  }
 }
 
 class ParquetV2QuerySuite extends ParquetQuerySuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org