You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/08/13 17:47:59 UTC
[spark] branch master updated: [SPARK-39926][SQL] Fix bug in column DEFAULT support for non-vectorized Parquet scans
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ce1fa04867c [SPARK-39926][SQL] Fix bug in column DEFAULT support for non-vectorized Parquet scans
ce1fa04867c is described below
commit ce1fa04867cb82168672807aa3ee6c2e2c9bea51
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Sat Aug 13 10:47:42 2022 -0700
[SPARK-39926][SQL] Fix bug in column DEFAULT support for non-vectorized Parquet scans
### What changes were proposed in this pull request?
Fix a bug in column DEFAULT support for non-vectorized Parquet scans, where inserting explicit NULL values to a column with a DEFAULT and then selecting the column back would sometimes erroneously return the default value.
To exercise the behavior:
```
set spark.sql.parquet.enableVectorizedReader=false;
create table t(a int) using parquet;
insert into t values (42);
alter table t add column b int default 42;
insert into t values (43, null);
select * from t;
```
This should return two rows:
`(42, 42) and (43, NULL)`
But instead the scan missed the inserted NULL value, and returned the existence DEFAULT value of "42" instead:
`(42, 42) and (43, 42)`.
After this bug fix, Spark now returns the former correct result.
### Why are the changes needed?
This fixes the correctness of SQL queries using Spark.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The PR includes unit test coverage.
Closes #37501 from dtenedor/fix-parquet-bug.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../datasources/parquet/ParquetRowConverter.scala | 73 ++++++----------------
.../org/apache/spark/sql/sources/InsertSuite.scala | 3 +-
2 files changed, 19 insertions(+), 57 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 6d64349cc3c..e38416bfc4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -184,49 +184,6 @@ private[parquet] class ParquetRowConverter(
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}
- /**
- * Subclass of RowUpdater that also updates a boolean array bitmask. In this way, after all
- * assignments are complete, it is possible to inspect the bitmask to determine which columns have
- * been written at least once.
- */
- private final class RowUpdaterWithBitmask(
- row: InternalRow,
- ordinal: Int,
- bitmask: Array[Boolean]) extends RowUpdater(row, ordinal) {
- override def set(value: Any): Unit = {
- bitmask(ordinal) = false
- super.set(value)
- }
- override def setBoolean(value: Boolean): Unit = {
- bitmask(ordinal) = false
- super.setBoolean(value)
- }
- override def setByte(value: Byte): Unit = {
- bitmask(ordinal) = false
- super.setByte(value)
- }
- override def setShort(value: Short): Unit = {
- bitmask(ordinal) = false
- super.setShort(value)
- }
- override def setInt(value: Int): Unit = {
- bitmask(ordinal) = false
- super.setInt(value)
- }
- override def setLong(value: Long): Unit = {
- bitmask(ordinal) = false
- super.setLong(value)
- }
- override def setDouble(value: Double): Unit = {
- bitmask(ordinal) = false
- super.setDouble(value)
- }
- override def setFloat(value: Float): Unit = {
- bitmask(ordinal) = false
- super.setFloat(value)
- }
- }
-
private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
/**
@@ -269,7 +226,22 @@ private[parquet] class ParquetRowConverter(
} else {
Map.empty[Int, Int]
}
-
+ // If any fields in the Catalyst result schema have associated existence default values,
+ // maintain a boolean array to track which fields have been explicitly assigned for each row.
+ if (catalystType.hasExistenceDefaultValues) {
+ for (i <- 0 until catalystType.existenceDefaultValues.size) {
+ catalystType.existenceDefaultsBitmask(i) =
+ // Assume the schema for a Parquet file-based table contains N fields. Then if we later
+ // run a command "ALTER TABLE t ADD COLUMN c DEFAULT <value>" on the Parquet table, this
+ // adds one field to the Catalyst schema. Then if we query the old files with the new
+ // Catalyst schema, we should only apply the existence default value to all columns >= N.
+ if (i < parquetType.getFieldCount) {
+ false
+ } else {
+ catalystType.existenceDefaultValues(i) != null
+ }
+ }
+ }
parquetType.getFields.asScala.map { parquetField =>
val catalystFieldIndex = Option(parquetField.getId).flatMap { fieldId =>
// field has id, try to match by id first before falling back to match by name
@@ -279,17 +251,8 @@ private[parquet] class ParquetRowConverter(
catalystFieldIdxByName(parquetField.getName)
}
val catalystField = catalystType(catalystFieldIndex)
- // Create a RowUpdater instance for converting Parquet objects to Catalyst rows. If any fields
- // in the Catalyst result schema have associated existence default values, maintain a boolean
- // array to track which fields have been explicitly assigned for each row.
- val rowUpdater: RowUpdater =
- if (catalystType.hasExistenceDefaultValues) {
- resetExistenceDefaultsBitmask(catalystType)
- new RowUpdaterWithBitmask(
- currentRow, catalystFieldIndex, catalystType.existenceDefaultsBitmask)
- } else {
- new RowUpdater(currentRow, catalystFieldIndex)
- }
+ // Create a RowUpdater instance for converting Parquet objects to Catalyst rows.
+ val rowUpdater: RowUpdater = new RowUpdater(currentRow, catalystFieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, rowUpdater)
}.toArray
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index ce35705c630..9ba01badd19 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1671,8 +1671,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
Config(
None),
Config(
- Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
- insertNullsToStorage = false)))
+ Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"))))
).foreach { testCase: TestCase =>
testCase.configs.foreach { config: Config =>
// Run the test twice, once using SQL for the INSERT operations and again using DataFrames.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org