You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/23 06:08:58 UTC

[spark] branch master updated: [SPARK-42484][SQL] UnsafeRowUtils better error message

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eb24dbd305b [SPARK-42484][SQL] UnsafeRowUtils better error message
eb24dbd305b is described below

commit eb24dbd305b9e46382d49d43202d0c3a89fd324f
Author: Wei Liu <we...@databricks.com>
AuthorDate: Thu Feb 23 14:08:37 2023 +0800

    [SPARK-42484][SQL] UnsafeRowUtils better error message
    
    ### What changes were proposed in this pull request?
    
    Showing the essential information when throwing `InvalidUnsafeRowException`. Including where the check failed, and status of the `unsafeRow` and `expctedSchema`
    
    Example output:
    ```
    [UnsafeRowStatus] expectedSchema: StructType(StructField(key1,IntegerType,false),StructField(key2,IntegerType,false),StructField(sum(key1),IntegerType,false),StructField(sum(key2),IntegerType,false)), expectedSchemaNumFields: 4, numFields: 4, bitSetWidthInBytes: 8, rowSizeInBytes: 40
    fieldStatus:
    [UnsafeRowFieldStatus] index: 0, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
    [UnsafeRowFieldStatus] index: 1, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
    [UnsafeRowFieldStatus] index: 2, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
    [UnsafeRowFieldStatus] index: 3, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
    ```
    
    ### Why are the changes needed?
    
    Right now if such error happens, it's hard to track where it errored, and what the misbehaved row & schema looks like. With this change these information are more clear.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #40073 from WweiL/SPARK-42484-better-log-unsaferowUtil.
    
    Authored-by: Wei Liu <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/util/UnsafeRowUtils.scala   | 83 ++++++++++++++++++----
 .../sql/catalyst/util/UnsafeRowUtilsSuite.scala    | 26 ++++---
 .../sql/execution/streaming/state/StateStore.scala | 18 ++---
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |  6 +-
 4 files changed, 101 insertions(+), 32 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 2791f404813..81b06cb466c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -46,15 +46,21 @@ object UnsafeRowUtils {
    *     check if the unused bits in the field are all zeros. The UnsafeRowWriter's write() methods
    *     make this guarantee.
    * - Check the total length of the row.
+   * @param row The input UnsafeRow to be validated
+   * @param expectedSchema The expected schema that should match with the UnsafeRow
+   * @return None if all the checks pass. An error message if the row is not matched with the schema
    */
-  def validateStructuralIntegrity(row: UnsafeRow, expectedSchema: StructType): Boolean = {
+  private def validateStructuralIntegrityWithReasonImpl(
+      row: UnsafeRow, expectedSchema: StructType): Option[String] = {
     if (expectedSchema.fields.length != row.numFields) {
-      return false
+      return Some(s"Field length mismatch: " +
+        s"expected: ${expectedSchema.fields.length}, actual: ${row.numFields}")
     }
     val bitSetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields)
     val rowSizeInBytes = row.getSizeInBytes
     if (expectedSchema.fields.length > 0 && bitSetWidthInBytes >= rowSizeInBytes) {
-      return false
+      return Some(s"rowSizeInBytes should not exceed bitSetWidthInBytes, " +
+        s"bitSetWidthInBytes: $bitSetWidthInBytes, rowSizeInBytes: $rowSizeInBytes")
     }
     var varLenFieldsSizeInBytes = 0
     expectedSchema.fields.zipWithIndex.foreach {
@@ -62,21 +68,31 @@ object UnsafeRowUtils {
         val (offset, size) = getOffsetAndSize(row, index)
         if (size < 0 ||
             offset < bitSetWidthInBytes + 8 * row.numFields || offset + size > rowSizeInBytes) {
-          return false
+          return Some(s"Variable-length field validation error: field: $field, index: $index")
         }
         varLenFieldsSizeInBytes += size
       case (field, index) if UnsafeRow.isFixedLength(field.dataType) && !row.isNullAt(index) =>
         field.dataType match {
           case BooleanType =>
-            if ((row.getLong(index) >> 1) != 0L) return false
+            if ((row.getLong(index) >> 1) != 0L) {
+              return Some(s"Fixed-length field validation error: field: $field, index: $index")
+            }
           case ByteType =>
-            if ((row.getLong(index) >> 8) != 0L) return false
+            if ((row.getLong(index) >> 8) != 0L) {
+              return Some(s"Fixed-length field validation error: field: $field, index: $index")
+            }
           case ShortType =>
-            if ((row.getLong(index) >> 16) != 0L) return false
+            if ((row.getLong(index) >> 16) != 0L) {
+              return Some(s"Fixed-length field validation error: field: $field, index: $index")
+            }
           case IntegerType =>
-            if ((row.getLong(index) >> 32) != 0L) return false
+            if ((row.getLong(index) >> 32) != 0L) {
+              return Some(s"Fixed-length field validation error: field: $field, index: $index")
+            }
           case FloatType =>
-            if ((row.getLong(index) >> 32) != 0L) return false
+            if ((row.getLong(index) >> 32) != 0L) {
+              return Some(s"Fixed-length field validation error: field: $field, index: $index")
+            }
           case _ =>
         }
       case (field, index) if row.isNullAt(index) =>
@@ -94,17 +110,37 @@ object UnsafeRowUtils {
             val (offset, size) = getOffsetAndSize(row, index)
             if (size != 0 || offset != 0 &&
                 (offset < bitSetWidthInBytes + 8 * row.numFields || offset > rowSizeInBytes)) {
-              return false
+              return Some(s"Variable-length decimal field special case validation error: " +
+                s"field: $field, index: $index")
             }
           case _ =>
-            if (row.getLong(index) != 0L) return false
+            if (row.getLong(index) != 0L) {
+              return Some(s"Variable-length offset-size validation error: " +
+                s"field: $field, index: $index")
+            }
         }
       case _ =>
     }
     if (bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes > rowSizeInBytes) {
-      return false
+      return Some(s"Row total length invalid: " +
+        s"calculated: ${bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes} " +
+        s"rowSizeInBytes: $rowSizeInBytes")
+    }
+    None
+  }
+
+  /**
+   * Wrapper of validateStructuralIntegrityWithReasonImpl, add more information for debugging
+   * @param row The input UnsafeRow to be validated
+   * @param expectedSchema The expected schema that should match with the UnsafeRow
+   * @return None if all the checks pass. An error message if the row is not matched with the schema
+   */
+  def validateStructuralIntegrityWithReason(
+    row: UnsafeRow, expectedSchema: StructType): Option[String] = {
+    validateStructuralIntegrityWithReasonImpl(row, expectedSchema).map {
+      errorMessage => s"Error message is: $errorMessage, " +
+          s"UnsafeRow status: ${getStructuralIntegrityStatus(row, expectedSchema)}"
     }
-    true
   }
 
   def getOffsetAndSize(row: UnsafeRow, index: Int): (Int, Int) = {
@@ -139,4 +175,25 @@ object UnsafeRowUtils {
     case CalendarIntervalType => true
     case _ => false
   }
+
+  def getStructuralIntegrityStatus(row: UnsafeRow, expectedSchema: StructType): String = {
+    val fieldStatusArr = expectedSchema.fields.zipWithIndex.map {
+      case (field, index) =>
+        val offsetAndSizeStr = if (!UnsafeRow.isFixedLength(field.dataType)) {
+          val (offset, size) = getOffsetAndSize(row, index)
+          s"offset: $offset, size: $size"
+        } else {
+          "" // offset and size doesn't make sense for fixed length field
+        }
+        s"[UnsafeRowFieldStatus] index: $index, " +
+          s"expectedFieldType: ${field.dataType}, isNull: ${row.isNullAt(index)}, " +
+          s"isFixedLength: ${UnsafeRow.isFixedLength(field.dataType)}. $offsetAndSizeStr"
+    }
+
+    s"[UnsafeRowStatus] expectedSchema: $expectedSchema, " +
+      s"expectedSchemaNumFields: ${expectedSchema.fields.length}, numFields: ${row.numFields}, " +
+      s"bitSetWidthInBytes: ${UnsafeRow.calculateBitSetWidthInBytes(row.numFields)}, " +
+      s"rowSizeInBytes: ${row.getSizeInBytes}\nfieldStatus:\n" +
+      fieldStatusArr.mkString("\n")
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
index 518d68ce1d2..c7a8bc74f4d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
@@ -44,15 +44,15 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
 
   test("UnsafeRow format invalidation") {
     // Pass the checking
-    UnsafeRowUtils.validateStructuralIntegrity(testRow, testOutputSchema)
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(testRow, testOutputSchema).isEmpty)
     // Fail for fields number not match
-    assert(!UnsafeRowUtils.validateStructuralIntegrity(
-      testRow, StructType(testKeys.map(createIntegerField))))
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(
+      testRow, StructType(testKeys.map(createIntegerField))).isDefined)
     // Fail for invalid schema
     val invalidSchema = StructType(testKeys.map(createIntegerField) ++
       Seq(StructField("struct", StructType(Seq(StructField("value1", StringType, true))), true),
         StructField("value2", IntegerType, false)))
-    assert(!UnsafeRowUtils.validateStructuralIntegrity(testRow, invalidSchema))
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(testRow, invalidSchema).isDefined)
   }
 
   test("Handle special case for null variable-length Decimal") {
@@ -62,23 +62,33 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
 
     // row is empty at this point
     assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
-    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
 
     // set Decimal field to precision-overflowed value
     val bigDecimalVal = Decimal(new JavaBigDecimal("12345678901234567890")) // precision=20, scale=0
     row.setDecimal(0, bigDecimalVal, 19) // should overflow and become null
     assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
-    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
 
     // set Decimal field to valid non-null value
     val bigDecimalVal2 = Decimal(new JavaBigDecimal("1234567890123456789")) // precision=19, scale=0
     row.setDecimal(0, bigDecimalVal2, 19) // should succeed
     assert(!row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 8))
-    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
 
     // set Decimal field to null explicitly, after which this field no longer supports updating
     row.setNullAt(0)
     assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (0, 0))
-    assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+    assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
+  }
+
+  test("Better schema status message") {
+    assert(UnsafeRowUtils.getStructuralIntegrityStatus(testRow, testOutputSchema)
+      .contains("[UnsafeRowStatus] expectedSchema: StructType(" +
+        "StructField(key1,IntegerType,false),StructField(key2,IntegerType,false)," +
+        "StructField(sum(key1),IntegerType,false),StructField(sum(key2),IntegerType,false)), " +
+        "expectedSchemaNumFields: 4, numFields: 4, bitSetWidthInBytes: 8, rowSizeInBytes: 40\n" +
+        "fieldStatus:\n" +
+        "[UnsafeRowFieldStatus] index: 0, expectedFieldType: IntegerType,"))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index b69e74b9a51..787f4e390e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -225,12 +225,12 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
 /**
  * An exception thrown when an invalid UnsafeRow is detected in state store.
  */
-class InvalidUnsafeRowException
+class InvalidUnsafeRowException(error: String)
   extends RuntimeException("The streaming query failed by state format invalidation. " +
     "The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " +
     "incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " +
     "among restart. For the first case, you can try to restart the application without " +
-    "checkpoint or use the legacy Spark version to process the streaming state.", null)
+    s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null)
 
 /**
  * Trait representing a provider that provide [[StateStore]] instances representing
@@ -341,13 +341,13 @@ object StateStoreProvider {
       valueSchema: StructType,
       conf: StateStoreConf): Unit = {
     if (conf.formatValidationEnabled) {
-      if (!UnsafeRowUtils.validateStructuralIntegrity(keyRow, keySchema)) {
-        throw new InvalidUnsafeRowException
-      }
-      if (conf.formatValidationCheckValue &&
-          !UnsafeRowUtils.validateStructuralIntegrity(valueRow, valueSchema)) {
-        throw new InvalidUnsafeRowException
-      }
+      val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
+      validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
+    }
+    if (conf.formatValidationCheckValue) {
+      val validationError =
+        UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema)
+      validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
     }
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 31dfbedbbb4..d0d845b50fb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1407,7 +1407,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
         },
         errorClass = "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY",
         parameters = Map("dataSourceType" -> "hive"),
-        context = ExpectedContext(s"hive.`${f.getCanonicalPath}`", 15, 104)
+        context = ExpectedContext(s"hive.`${f.getCanonicalPath}`",
+          15, 21 + f.getCanonicalPath.length)
       )
 
       // data source type is case insensitive
@@ -1417,7 +1418,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
         },
         errorClass = "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY",
         parameters = Map("dataSourceType" -> "HIVE"),
-        context = ExpectedContext(s"HIVE.`${f.getCanonicalPath}`", 15, 104)
+        context = ExpectedContext(s"HIVE.`${f.getCanonicalPath}`",
+          15, 21 + f.getCanonicalPath.length)
       )
     })
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org