You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/07/28 00:18:50 UTC

[spark] branch branch-3.2 updated: [SPARK-39839][SQL] Handle special case of null variable-length Decimal with non-zero offsetAndSize in UnsafeRow structural integrity check

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 57f1bb74230 [SPARK-39839][SQL] Handle special case of null variable-length Decimal with non-zero offsetAndSize in UnsafeRow structural integrity check
57f1bb74230 is described below

commit 57f1bb74230f08422147e10ac981c968d0f7a5cf
Author: Kris Mok <kr...@databricks.com>
AuthorDate: Thu Jul 28 08:49:33 2022 +0900

    [SPARK-39839][SQL] Handle special case of null variable-length Decimal with non-zero offsetAndSize in UnsafeRow structural integrity check
    
    ### What changes were proposed in this pull request?
    
    Update the `UnsafeRow` structural integrity check in `UnsafeRowUtils.validateStructuralIntegrity` to handle a special case with null variable-length DecimalType value.
    
    ### Why are the changes needed?
    
    The check should follow the format that `UnsafeRowWriter` produces. In general, `UnsafeRowWriter` clears out a field with zero when the field is set to be null, c.f. `UnsafeRowWriter.setNullAt(ordinal)` and `UnsafeRow.setNullAt(ordinal)`.
    
    But there's a special case for `DecimalType` values: this is the only type that is both:
    - can be fixed-length or variable-length, depending on the precision, and
    - is mutable in `UnsafeRow`.
    
    To support a variable-length `DecimalType` to be mutable in `UnsafeRow`, the `UnsafeRowWriter` always leaves a 16-byte space in the variable-length section of the `UnsafeRow` (tail end of the row), regardless of whether the `Decimal` value being written is null or not. In the fixed-length part of the field, it would be an "OffsetAndSize", and the `offset` part always points to the start offset of the variable-length part of the field, while the `size` part will either be `0` for the n [...]
    When `setNullAt(ordinal)` is called instead of passing a null value to `write(int, Decimal, int, int)`, however, the `offset` part gets zero'd out and this field stops being mutable. There's a comment on `UnsafeRow.setDecimal` that mentions to keep this field able to support updates, `setNullAt(ordinal)` cannot be called, but there's no code enforcement of that.
    
    So we need to recognize that in the structural integrity check and allow variable-length `DecimalType` to have non-zero field even for null.
    
    Note that for non-null values, the existing check does conform to the format from `UnsafeRowWriter`. It's only null value of variable-length `DecimalType` that'd trigger a bug, which can affect Structured Streaming's checkpoint file read where this check is applied.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, previously the `UnsafeRow` structural integrity validation will return false positive for correct data, when there's a null value in a variable-length `DecimalType` field. The fix will no longer return false positive.
    Because the Structured Streaming checkpoint file validation uses this check, previously a good checkpoint file may be rejected by the check, and the only workaround is to disable the check; with the fix, the correct checkpoint file will be allowed to load.
    
    ### How was this patch tested?
    
    Added new test case in `UnsafeRowUtilsSuite`
    
    Closes #37252 from rednaxelafx/fix-unsaferow-validation.
    
    Authored-by: Kris Mok <kr...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit c608ae2fc6a3a50f2e67f2a3dad8d4e4be1aaf9f)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../spark/sql/catalyst/util/UnsafeRowUtils.scala   | 44 ++++++++++++++++++----
 .../sql/catalyst/util/UnsafeRowUtilsSuite.scala    | 31 ++++++++++++++-
 2 files changed, 67 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 37a34fac663..48db0c7d971 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -27,8 +27,15 @@ object UnsafeRowUtils {
    * - schema.fields.length == row.numFields should always be true
    * - UnsafeRow.calculateBitSetWidthInBytes(row.numFields) < row.getSizeInBytes should always be
    *   true if the expectedSchema contains at least one field.
-   * - For variable-length fields: if null bit says it's null then don't do anything, else extract
-   *   offset and size:
+   * - For variable-length fields:
+   *   - if null bit says it's null, then
+   *     - in general the offset-and-size should be zero
+   *     - special case: variable-length DecimalType is considered mutable in UnsafeRow, and to
+   *       support that, the offset is set to point to the variable-length part like a non-null
+   *       value, while the size is set to zero to signal that it's a null value. The offset
+   *       may also be set to zero, in which case this variable-length Decimal no longer supports
+   *       being mutable in the UnsafeRow.
+   *   - otherwise the field is not null, then extract offset and size:
    *   1) 0 <= size < row.getSizeInBytes should always be true. We can be even more precise than
    *      this, where the upper bound of size can only be as big as the variable length part of
    *      the row.
@@ -52,9 +59,7 @@ object UnsafeRowUtils {
     var varLenFieldsSizeInBytes = 0
     expectedSchema.fields.zipWithIndex.foreach {
       case (field, index) if !UnsafeRow.isFixedLength(field.dataType) && !row.isNullAt(index) =>
-        val offsetAndSize = row.getLong(index)
-        val offset = (offsetAndSize >> 32).toInt
-        val size = offsetAndSize.toInt
+        val (offset, size) = getOffsetAndSize(row, index)
         if (size < 0 ||
             offset < bitSetWidthInBytes + 8 * row.numFields || offset + size > rowSizeInBytes) {
           return false
@@ -74,8 +79,26 @@ object UnsafeRowUtils {
             if ((row.getLong(index) >> 32) != 0L) return false
           case _ =>
         }
-      case (_, index) if row.isNullAt(index) =>
-        if (row.getLong(index) != 0L) return false
+      case (field, index) if row.isNullAt(index) =>
+        field.dataType match {
+          case dt: DecimalType if !UnsafeRow.isFixedLength(dt) =>
+            // See special case in UnsafeRowWriter.write(int, Decimal, int, int) and
+            // UnsafeRow.setDecimal(int, Decimal, int).
+            // A variable-length Decimal may be marked as null while having non-zero offset and
+            // zero length. This allows the field to be updated (i.e. mutable variable-length data)
+
+            // Check the integrity of null value of variable-length DecimalType in UnsafeRow:
+            // 1. size must be zero
+            // 2. offset may be zero, in which case this variable-length field is no longer mutable
+            // 3. otherwise offset is non-zero, range check it the same way as a non-null value
+            val (offset, size) = getOffsetAndSize(row, index)
+            if (size != 0 || offset != 0 &&
+                (offset < bitSetWidthInBytes + 8 * row.numFields || offset > rowSizeInBytes)) {
+              return false
+            }
+          case _ =>
+            if (row.getLong(index) != 0L) return false
+        }
       case _ =>
     }
     if (bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes > rowSizeInBytes) {
@@ -83,4 +106,11 @@ object UnsafeRowUtils {
     }
     true
   }
+
+  def getOffsetAndSize(row: UnsafeRow, index: Int): (Int, Int) = {
+    val offsetAndSize = row.getLong(index)
+    val offset = (offsetAndSize >> 32).toInt
+    val size = offsetAndSize.toInt
+    (offset, size)
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
index 4b6a3cfafd8..518d68ce1d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.math.{BigDecimal => JavaBigDecimal}
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, StringType, StructField, StructType}
 
 class UnsafeRowUtilsSuite extends SparkFunSuite {
 
@@ -52,4 +54,31 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
         StructField("value2", IntegerType, false)))
     assert(!UnsafeRowUtils.validateStructuralIntegrity(testRow, invalidSchema))
   }
+
+  test("Handle special case for null variable-length Decimal") {
+    val schema = StructType(StructField("d", DecimalType(19, 0), nullable = true) :: Nil)
+    val unsafeRowProjection = UnsafeProjection.create(schema)
+    val row = unsafeRowProjection(new SpecificInternalRow(schema))
+
+    // row is empty at this point
+    assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
+    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+
+    // set Decimal field to precision-overflowed value
+    val bigDecimalVal = Decimal(new JavaBigDecimal("12345678901234567890")) // precision=20, scale=0
+    row.setDecimal(0, bigDecimalVal, 19) // should overflow and become null
+    assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
+    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+
+    // set Decimal field to valid non-null value
+    val bigDecimalVal2 = Decimal(new JavaBigDecimal("1234567890123456789")) // precision=19, scale=0
+    row.setDecimal(0, bigDecimalVal2, 19) // should succeed
+    assert(!row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 8))
+    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+
+    // set Decimal field to null explicitly, after which this field no longer supports updating
+    row.setNullAt(0)
+    assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (0, 0))
+    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org