You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/23 06:08:58 UTC
[spark] branch master updated: [SPARK-42484][SQL] UnsafeRowUtils better error message
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new eb24dbd305b [SPARK-42484][SQL] UnsafeRowUtils better error message
eb24dbd305b is described below
commit eb24dbd305b9e46382d49d43202d0c3a89fd324f
Author: Wei Liu <we...@databricks.com>
AuthorDate: Thu Feb 23 14:08:37 2023 +0800
[SPARK-42484][SQL] UnsafeRowUtils better error message
### What changes were proposed in this pull request?
Showing the essential information when throwing `InvalidUnsafeRowException`. Including where the check failed, and status of the `unsafeRow` and `expctedSchema`
Example output:
```
[UnsafeRowStatus] expectedSchema: StructType(StructField(key1,IntegerType,false),StructField(key2,IntegerType,false),StructField(sum(key1),IntegerType,false),StructField(sum(key2),IntegerType,false)), expectedSchemaNumFields: 4, numFields: 4, bitSetWidthInBytes: 8, rowSizeInBytes: 40
fieldStatus:
[UnsafeRowFieldStatus] index: 0, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
[UnsafeRowFieldStatus] index: 1, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
[UnsafeRowFieldStatus] index: 2, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
[UnsafeRowFieldStatus] index: 3, expectedFieldType: IntegerType, isNull: false, isFixedLength: true, offset: -1, size: -1
```
### Why are the changes needed?
Right now if such error happens, it's hard to track where it errored, and what the misbehaved row & schema looks like. With this change these information are more clear.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Closes #40073 from WweiL/SPARK-42484-better-log-unsaferowUtil.
Authored-by: Wei Liu <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/util/UnsafeRowUtils.scala | 83 ++++++++++++++++++----
.../sql/catalyst/util/UnsafeRowUtilsSuite.scala | 26 ++++---
.../sql/execution/streaming/state/StateStore.scala | 18 ++---
.../spark/sql/hive/execution/SQLQuerySuite.scala | 6 +-
4 files changed, 101 insertions(+), 32 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 2791f404813..81b06cb466c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -46,15 +46,21 @@ object UnsafeRowUtils {
* check if the unused bits in the field are all zeros. The UnsafeRowWriter's write() methods
* make this guarantee.
* - Check the total length of the row.
+ * @param row The input UnsafeRow to be validated
+ * @param expectedSchema The expected schema that should match with the UnsafeRow
+ * @return None if all the checks pass. An error message if the row is not matched with the schema
*/
- def validateStructuralIntegrity(row: UnsafeRow, expectedSchema: StructType): Boolean = {
+ private def validateStructuralIntegrityWithReasonImpl(
+ row: UnsafeRow, expectedSchema: StructType): Option[String] = {
if (expectedSchema.fields.length != row.numFields) {
- return false
+ return Some(s"Field length mismatch: " +
+ s"expected: ${expectedSchema.fields.length}, actual: ${row.numFields}")
}
val bitSetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields)
val rowSizeInBytes = row.getSizeInBytes
if (expectedSchema.fields.length > 0 && bitSetWidthInBytes >= rowSizeInBytes) {
- return false
+ return Some(s"rowSizeInBytes should not exceed bitSetWidthInBytes, " +
+ s"bitSetWidthInBytes: $bitSetWidthInBytes, rowSizeInBytes: $rowSizeInBytes")
}
var varLenFieldsSizeInBytes = 0
expectedSchema.fields.zipWithIndex.foreach {
@@ -62,21 +68,31 @@ object UnsafeRowUtils {
val (offset, size) = getOffsetAndSize(row, index)
if (size < 0 ||
offset < bitSetWidthInBytes + 8 * row.numFields || offset + size > rowSizeInBytes) {
- return false
+ return Some(s"Variable-length field validation error: field: $field, index: $index")
}
varLenFieldsSizeInBytes += size
case (field, index) if UnsafeRow.isFixedLength(field.dataType) && !row.isNullAt(index) =>
field.dataType match {
case BooleanType =>
- if ((row.getLong(index) >> 1) != 0L) return false
+ if ((row.getLong(index) >> 1) != 0L) {
+ return Some(s"Fixed-length field validation error: field: $field, index: $index")
+ }
case ByteType =>
- if ((row.getLong(index) >> 8) != 0L) return false
+ if ((row.getLong(index) >> 8) != 0L) {
+ return Some(s"Fixed-length field validation error: field: $field, index: $index")
+ }
case ShortType =>
- if ((row.getLong(index) >> 16) != 0L) return false
+ if ((row.getLong(index) >> 16) != 0L) {
+ return Some(s"Fixed-length field validation error: field: $field, index: $index")
+ }
case IntegerType =>
- if ((row.getLong(index) >> 32) != 0L) return false
+ if ((row.getLong(index) >> 32) != 0L) {
+ return Some(s"Fixed-length field validation error: field: $field, index: $index")
+ }
case FloatType =>
- if ((row.getLong(index) >> 32) != 0L) return false
+ if ((row.getLong(index) >> 32) != 0L) {
+ return Some(s"Fixed-length field validation error: field: $field, index: $index")
+ }
case _ =>
}
case (field, index) if row.isNullAt(index) =>
@@ -94,17 +110,37 @@ object UnsafeRowUtils {
val (offset, size) = getOffsetAndSize(row, index)
if (size != 0 || offset != 0 &&
(offset < bitSetWidthInBytes + 8 * row.numFields || offset > rowSizeInBytes)) {
- return false
+ return Some(s"Variable-length decimal field special case validation error: " +
+ s"field: $field, index: $index")
}
case _ =>
- if (row.getLong(index) != 0L) return false
+ if (row.getLong(index) != 0L) {
+ return Some(s"Variable-length offset-size validation error: " +
+ s"field: $field, index: $index")
+ }
}
case _ =>
}
if (bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes > rowSizeInBytes) {
- return false
+ return Some(s"Row total length invalid: " +
+ s"calculated: ${bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes} " +
+ s"rowSizeInBytes: $rowSizeInBytes")
+ }
+ None
+ }
+
+ /**
+ * Wrapper of validateStructuralIntegrityWithReasonImpl, add more information for debugging
+ * @param row The input UnsafeRow to be validated
+ * @param expectedSchema The expected schema that should match with the UnsafeRow
+ * @return None if all the checks pass. An error message if the row is not matched with the schema
+ */
+ def validateStructuralIntegrityWithReason(
+ row: UnsafeRow, expectedSchema: StructType): Option[String] = {
+ validateStructuralIntegrityWithReasonImpl(row, expectedSchema).map {
+ errorMessage => s"Error message is: $errorMessage, " +
+ s"UnsafeRow status: ${getStructuralIntegrityStatus(row, expectedSchema)}"
}
- true
}
def getOffsetAndSize(row: UnsafeRow, index: Int): (Int, Int) = {
@@ -139,4 +175,25 @@ object UnsafeRowUtils {
case CalendarIntervalType => true
case _ => false
}
+
+ def getStructuralIntegrityStatus(row: UnsafeRow, expectedSchema: StructType): String = {
+ val fieldStatusArr = expectedSchema.fields.zipWithIndex.map {
+ case (field, index) =>
+ val offsetAndSizeStr = if (!UnsafeRow.isFixedLength(field.dataType)) {
+ val (offset, size) = getOffsetAndSize(row, index)
+ s"offset: $offset, size: $size"
+ } else {
+ "" // offset and size doesn't make sense for fixed length field
+ }
+ s"[UnsafeRowFieldStatus] index: $index, " +
+ s"expectedFieldType: ${field.dataType}, isNull: ${row.isNullAt(index)}, " +
+ s"isFixedLength: ${UnsafeRow.isFixedLength(field.dataType)}. $offsetAndSizeStr"
+ }
+
+ s"[UnsafeRowStatus] expectedSchema: $expectedSchema, " +
+ s"expectedSchemaNumFields: ${expectedSchema.fields.length}, numFields: ${row.numFields}, " +
+ s"bitSetWidthInBytes: ${UnsafeRow.calculateBitSetWidthInBytes(row.numFields)}, " +
+ s"rowSizeInBytes: ${row.getSizeInBytes}\nfieldStatus:\n" +
+ fieldStatusArr.mkString("\n")
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
index 518d68ce1d2..c7a8bc74f4d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
@@ -44,15 +44,15 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
test("UnsafeRow format invalidation") {
// Pass the checking
- UnsafeRowUtils.validateStructuralIntegrity(testRow, testOutputSchema)
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(testRow, testOutputSchema).isEmpty)
// Fail for fields number not match
- assert(!UnsafeRowUtils.validateStructuralIntegrity(
- testRow, StructType(testKeys.map(createIntegerField))))
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(
+ testRow, StructType(testKeys.map(createIntegerField))).isDefined)
// Fail for invalid schema
val invalidSchema = StructType(testKeys.map(createIntegerField) ++
Seq(StructField("struct", StructType(Seq(StructField("value1", StringType, true))), true),
StructField("value2", IntegerType, false)))
- assert(!UnsafeRowUtils.validateStructuralIntegrity(testRow, invalidSchema))
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(testRow, invalidSchema).isDefined)
}
test("Handle special case for null variable-length Decimal") {
@@ -62,23 +62,33 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
// row is empty at this point
assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
- assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
// set Decimal field to precision-overflowed value
val bigDecimalVal = Decimal(new JavaBigDecimal("12345678901234567890")) // precision=20, scale=0
row.setDecimal(0, bigDecimalVal, 19) // should overflow and become null
assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
- assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
// set Decimal field to valid non-null value
val bigDecimalVal2 = Decimal(new JavaBigDecimal("1234567890123456789")) // precision=19, scale=0
row.setDecimal(0, bigDecimalVal2, 19) // should succeed
assert(!row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 8))
- assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
// set Decimal field to null explicitly, after which this field no longer supports updating
row.setNullAt(0)
assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (0, 0))
- assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
+ assert(UnsafeRowUtils.validateStructuralIntegrityWithReason(row, schema).isEmpty)
+ }
+
+ test("Better schema status message") {
+ assert(UnsafeRowUtils.getStructuralIntegrityStatus(testRow, testOutputSchema)
+ .contains("[UnsafeRowStatus] expectedSchema: StructType(" +
+ "StructField(key1,IntegerType,false),StructField(key2,IntegerType,false)," +
+ "StructField(sum(key1),IntegerType,false),StructField(sum(key2),IntegerType,false)), " +
+ "expectedSchemaNumFields: 4, numFields: 4, bitSetWidthInBytes: 8, rowSizeInBytes: 40\n" +
+ "fieldStatus:\n" +
+ "[UnsafeRowFieldStatus] index: 0, expectedFieldType: IntegerType,"))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index b69e74b9a51..787f4e390e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -225,12 +225,12 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
/**
* An exception thrown when an invalid UnsafeRow is detected in state store.
*/
-class InvalidUnsafeRowException
+class InvalidUnsafeRowException(error: String)
extends RuntimeException("The streaming query failed by state format invalidation. " +
"The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " +
"incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " +
"among restart. For the first case, you can try to restart the application without " +
- "checkpoint or use the legacy Spark version to process the streaming state.", null)
+ s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null)
/**
* Trait representing a provider that provide [[StateStore]] instances representing
@@ -341,13 +341,13 @@ object StateStoreProvider {
valueSchema: StructType,
conf: StateStoreConf): Unit = {
if (conf.formatValidationEnabled) {
- if (!UnsafeRowUtils.validateStructuralIntegrity(keyRow, keySchema)) {
- throw new InvalidUnsafeRowException
- }
- if (conf.formatValidationCheckValue &&
- !UnsafeRowUtils.validateStructuralIntegrity(valueRow, valueSchema)) {
- throw new InvalidUnsafeRowException
- }
+ val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
+ validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
+ }
+ if (conf.formatValidationCheckValue) {
+ val validationError =
+ UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema)
+ validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 31dfbedbbb4..d0d845b50fb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1407,7 +1407,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
},
errorClass = "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY",
parameters = Map("dataSourceType" -> "hive"),
- context = ExpectedContext(s"hive.`${f.getCanonicalPath}`", 15, 104)
+ context = ExpectedContext(s"hive.`${f.getCanonicalPath}`",
+ 15, 21 + f.getCanonicalPath.length)
)
// data source type is case insensitive
@@ -1417,7 +1418,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
},
errorClass = "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY",
parameters = Map("dataSourceType" -> "HIVE"),
- context = ExpectedContext(s"HIVE.`${f.getCanonicalPath}`", 15, 104)
+ context = ExpectedContext(s"HIVE.`${f.getCanonicalPath}`",
+ 15, 21 + f.getCanonicalPath.length)
)
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org