You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/07/28 00:18:50 UTC
[spark] branch branch-3.2 updated: [SPARK-39839][SQL] Handle special case of null variable-length Decimal with non-zero offsetAndSize in UnsafeRow structural integrity check
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 57f1bb74230 [SPARK-39839][SQL] Handle special case of null variable-length Decimal with non-zero offsetAndSize in UnsafeRow structural integrity check
57f1bb74230 is described below
commit 57f1bb74230f08422147e10ac981c968d0f7a5cf
Author: Kris Mok <kr...@databricks.com>
AuthorDate: Thu Jul 28 08:49:33 2022 +0900
[SPARK-39839][SQL] Handle special case of null variable-length Decimal with non-zero offsetAndSize in UnsafeRow structural integrity check
### What changes were proposed in this pull request?
Update the `UnsafeRow` structural integrity check in `UnsafeRowUtils.validateStructuralIntegrity` to handle a special case with null variable-length DecimalType value.
### Why are the changes needed?
The check should follow the format that `UnsafeRowWriter` produces. In general, `UnsafeRowWriter` clears out a field with zero when the field is set to be null, c.f. `UnsafeRowWriter.setNullAt(ordinal)` and `UnsafeRow.setNullAt(ordinal)`.
But there's a special case for `DecimalType` values: this is the only type that is both:
- can be fixed-length or variable-length, depending on the precision, and
- is mutable in `UnsafeRow`.
To support a variable-length `DecimalType` to be mutable in `UnsafeRow`, the `UnsafeRowWriter` always leaves a 16-byte space in the variable-length section of the `UnsafeRow` (tail end of the row), regardless of whether the `Decimal` value being written is null or not. In the fixed-length part of the field, it would be an "OffsetAndSize", and the `offset` part always points to the start offset of the variable-length part of the field, while the `size` part will either be `0` for the n [...]
When `setNullAt(ordinal)` is called instead of passing a null value to `write(int, Decimal, int, int)`, however, the `offset` part gets zero'd out and this field stops being mutable. There's a comment on `UnsafeRow.setDecimal` that mentions to keep this field able to support updates, `setNullAt(ordinal)` cannot be called, but there's no code enforcement of that.
So we need to recognize that in the structural integrity check and allow variable-length `DecimalType` to have non-zero field even for null.
Note that for non-null values, the existing check does conform to the format from `UnsafeRowWriter`. It's only null value of variable-length `DecimalType` that'd trigger a bug, which can affect Structured Streaming's checkpoint file read where this check is applied.
### Does this PR introduce _any_ user-facing change?
Yes, previously the `UnsafeRow` structural integrity validation will return false positive for correct data, when there's a null value in a variable-length `DecimalType` field. The fix will no longer return false positive.
Because the Structured Streaming checkpoint file validation uses this check, previously a good checkpoint file may be rejected by the check, and the only workaround is to disable the check; with the fix, the correct checkpoint file will be allowed to load.
### How was this patch tested?
Added new test case in `UnsafeRowUtilsSuite`
Closes #37252 from rednaxelafx/fix-unsaferow-validation.
Authored-by: Kris Mok <kr...@databricks.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
(cherry picked from commit c608ae2fc6a3a50f2e67f2a3dad8d4e4be1aaf9f)
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../spark/sql/catalyst/util/UnsafeRowUtils.scala | 44 ++++++++++++++++++----
.../sql/catalyst/util/UnsafeRowUtilsSuite.scala | 31 ++++++++++++++-
2 files changed, 67 insertions(+), 8 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 37a34fac663..48db0c7d971 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -27,8 +27,15 @@ object UnsafeRowUtils {
* - schema.fields.length == row.numFields should always be true
* - UnsafeRow.calculateBitSetWidthInBytes(row.numFields) < row.getSizeInBytes should always be
* true if the expectedSchema contains at least one field.
- * - For variable-length fields: if null bit says it's null then don't do anything, else extract
- * offset and size:
+ * - For variable-length fields:
+ * - if null bit says it's null, then
+ * - in general the offset-and-size should be zero
+ * - special case: variable-length DecimalType is considered mutable in UnsafeRow, and to
+ * support that, the offset is set to point to the variable-length part like a non-null
+ * value, while the size is set to zero to signal that it's a null value. The offset
+ * may also be set to zero, in which case this variable-length Decimal no longer supports
+ * being mutable in the UnsafeRow.
+ * - otherwise the field is not null, then extract offset and size:
* 1) 0 <= size < row.getSizeInBytes should always be true. We can be even more precise than
* this, where the upper bound of size can only be as big as the variable length part of
* the row.
@@ -52,9 +59,7 @@ object UnsafeRowUtils {
var varLenFieldsSizeInBytes = 0
expectedSchema.fields.zipWithIndex.foreach {
case (field, index) if !UnsafeRow.isFixedLength(field.dataType) && !row.isNullAt(index) =>
- val offsetAndSize = row.getLong(index)
- val offset = (offsetAndSize >> 32).toInt
- val size = offsetAndSize.toInt
+ val (offset, size) = getOffsetAndSize(row, index)
if (size < 0 ||
offset < bitSetWidthInBytes + 8 * row.numFields || offset + size > rowSizeInBytes) {
return false
@@ -74,8 +79,26 @@ object UnsafeRowUtils {
if ((row.getLong(index) >> 32) != 0L) return false
case _ =>
}
- case (_, index) if row.isNullAt(index) =>
- if (row.getLong(index) != 0L) return false
+ case (field, index) if row.isNullAt(index) =>
+ field.dataType match {
+ case dt: DecimalType if !UnsafeRow.isFixedLength(dt) =>
+ // See special case in UnsafeRowWriter.write(int, Decimal, int, int) and
+ // UnsafeRow.setDecimal(int, Decimal, int).
+ // A variable-length Decimal may be marked as null while having non-zero offset and
+ // zero length. This allows the field to be updated (i.e. mutable variable-length data)
+
+ // Check the integrity of null value of variable-length DecimalType in UnsafeRow:
+ // 1. size must be zero
+ // 2. offset may be zero, in which case this variable-length field is no longer mutable
+ // 3. otherwise offset is non-zero, range check it the same way as a non-null value
+ val (offset, size) = getOffsetAndSize(row, index)
+ if (size != 0 || offset != 0 &&
+ (offset < bitSetWidthInBytes + 8 * row.numFields || offset > rowSizeInBytes)) {
+ return false
+ }
+ case _ =>
+ if (row.getLong(index) != 0L) return false
+ }
case _ =>
}
if (bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes > rowSizeInBytes) {
@@ -83,4 +106,11 @@ object UnsafeRowUtils {
}
true
}
+
+ def getOffsetAndSize(row: UnsafeRow, index: Int): (Int, Int) = {
+ val offsetAndSize = row.getLong(index)
+ val offset = (offsetAndSize >> 32).toInt
+ val size = offsetAndSize.toInt
+ (offset, size)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
index 4b6a3cfafd8..518d68ce1d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.catalyst.util
+import java.math.{BigDecimal => JavaBigDecimal}
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, StringType, StructField, StructType}
class UnsafeRowUtilsSuite extends SparkFunSuite {
@@ -52,4 +54,31 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
StructField("value2", IntegerType, false)))
assert(!UnsafeRowUtils.validateStructuralIntegrity(testRow, invalidSchema))
}
+
+ test("Handle special case for null variable-length Decimal") {
+ val schema = StructType(StructField("d", DecimalType(19, 0), nullable = true) :: Nil)
+ val unsafeRowProjection = UnsafeProjection.create(schema)
+ val row = unsafeRowProjection(new SpecificInternalRow(schema))
+
+ // row is empty at this point
+ assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
+ assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+
+ // set Decimal field to precision-overflowed value
+ val bigDecimalVal = Decimal(new JavaBigDecimal("12345678901234567890")) // precision=20, scale=0
+ row.setDecimal(0, bigDecimalVal, 19) // should overflow and become null
+ assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
+ assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+
+ // set Decimal field to valid non-null value
+ val bigDecimalVal2 = Decimal(new JavaBigDecimal("1234567890123456789")) // precision=19, scale=0
+ row.setDecimal(0, bigDecimalVal2, 19) // should succeed
+ assert(!row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 8))
+ assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+
+ // set Decimal field to null explicitly, after which this field no longer supports updating
+ row.setNullAt(0)
+ assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (0, 0))
+ assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org