You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/08/13 17:47:59 UTC

[spark] branch master updated: [SPARK-39926][SQL] Fix bug in column DEFAULT support for non-vectorized Parquet scans

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ce1fa04867c [SPARK-39926][SQL] Fix bug in column DEFAULT support for non-vectorized Parquet scans
ce1fa04867c is described below

commit ce1fa04867cb82168672807aa3ee6c2e2c9bea51
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Sat Aug 13 10:47:42 2022 -0700

    [SPARK-39926][SQL] Fix bug in column DEFAULT support for non-vectorized Parquet scans
    
    ### What changes were proposed in this pull request?
    
    Fix a bug in column DEFAULT support for non-vectorized Parquet scans, where inserting explicit NULL values to a column with a DEFAULT and then selecting the column back would sometimes erroneously return the default value.
    
    To exercise the behavior:
    
    ```
    set spark.sql.parquet.enableVectorizedReader=false;
    create table t(a int) using parquet;
    insert into t values (42);
    alter table t add column b int default 42;
    insert into t values (43, null);
    select * from t;
    ```
    
    This should return two rows:
    
    `(42, 42) and (43, NULL)`
    
    But instead the scan missed the inserted NULL value, and returned the existence DEFAULT value of "42" instead:
    
    `(42, 42) and (43, 42)`.
    
    After this bug fix, Spark now returns the former correct result.
    
    ### Why are the changes needed?
    
    This fixes the correctness of SQL queries using Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    The PR includes unit test coverage.
    
    Closes #37501 from dtenedor/fix-parquet-bug.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../datasources/parquet/ParquetRowConverter.scala  | 73 ++++++----------------
 .../org/apache/spark/sql/sources/InsertSuite.scala |  3 +-
 2 files changed, 19 insertions(+), 57 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 6d64349cc3c..e38416bfc4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -184,49 +184,6 @@ private[parquet] class ParquetRowConverter(
     override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
   }
 
-  /**
-   * Subclass of RowUpdater that also updates a boolean array bitmask. In this way, after all
-   * assignments are complete, it is possible to inspect the bitmask to determine which columns have
-   * been written at least once.
-   */
-  private final class RowUpdaterWithBitmask(
-      row: InternalRow,
-      ordinal: Int,
-      bitmask: Array[Boolean]) extends RowUpdater(row, ordinal) {
-    override def set(value: Any): Unit = {
-      bitmask(ordinal) = false
-      super.set(value)
-    }
-    override def setBoolean(value: Boolean): Unit = {
-      bitmask(ordinal) = false
-      super.setBoolean(value)
-    }
-    override def setByte(value: Byte): Unit = {
-      bitmask(ordinal) = false
-      super.setByte(value)
-    }
-    override def setShort(value: Short): Unit = {
-      bitmask(ordinal) = false
-      super.setShort(value)
-    }
-    override def setInt(value: Int): Unit = {
-      bitmask(ordinal) = false
-      super.setInt(value)
-    }
-    override def setLong(value: Long): Unit = {
-      bitmask(ordinal) = false
-      super.setLong(value)
-    }
-    override def setDouble(value: Double): Unit = {
-      bitmask(ordinal) = false
-      super.setDouble(value)
-    }
-    override def setFloat(value: Float): Unit = {
-      bitmask(ordinal) = false
-      super.setFloat(value)
-    }
-  }
-
   private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
 
   /**
@@ -269,7 +226,22 @@ private[parquet] class ParquetRowConverter(
       } else {
         Map.empty[Int, Int]
       }
-
+    // If any fields in the Catalyst result schema have associated existence default values,
+    // maintain a boolean array to track which fields have been explicitly assigned for each row.
+    if (catalystType.hasExistenceDefaultValues) {
+      for (i <- 0 until catalystType.existenceDefaultValues.size) {
+        catalystType.existenceDefaultsBitmask(i) =
+          // Assume the schema for a Parquet file-based table contains N fields. Then if we later
+          // run a command "ALTER TABLE t ADD COLUMN c DEFAULT <value>" on the Parquet table, this
+          // adds one field to the Catalyst schema. Then if we query the old files with the new
+          // Catalyst schema, we should only apply the existence default value to all columns >= N.
+          if (i < parquetType.getFieldCount) {
+            false
+          } else {
+            catalystType.existenceDefaultValues(i) != null
+          }
+      }
+    }
     parquetType.getFields.asScala.map { parquetField =>
       val catalystFieldIndex = Option(parquetField.getId).flatMap { fieldId =>
         // field has id, try to match by id first before falling back to match by name
@@ -279,17 +251,8 @@ private[parquet] class ParquetRowConverter(
         catalystFieldIdxByName(parquetField.getName)
       }
       val catalystField = catalystType(catalystFieldIndex)
-      // Create a RowUpdater instance for converting Parquet objects to Catalyst rows. If any fields
-      // in the Catalyst result schema have associated existence default values, maintain a boolean
-      // array to track which fields have been explicitly assigned for each row.
-      val rowUpdater: RowUpdater =
-        if (catalystType.hasExistenceDefaultValues) {
-          resetExistenceDefaultsBitmask(catalystType)
-          new RowUpdaterWithBitmask(
-            currentRow, catalystFieldIndex, catalystType.existenceDefaultsBitmask)
-        } else {
-          new RowUpdater(currentRow, catalystFieldIndex)
-        }
+      // Create a RowUpdater instance for converting Parquet objects to Catalyst rows.
+      val rowUpdater: RowUpdater = new RowUpdater(currentRow, catalystFieldIndex)
       // Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
       newConverter(parquetField, catalystField.dataType, rowUpdater)
     }.toArray
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index ce35705c630..9ba01badd19 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1671,8 +1671,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
           Config(
             None),
           Config(
-            Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
-            insertNullsToStorage = false)))
+            Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"))))
     ).foreach { testCase: TestCase =>
       testCase.configs.foreach { config: Config =>
         // Run the test twice, once using SQL for the INSERT operations and again using DataFrames.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org