You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/02 22:10:28 UTC
[spark] branch master updated: [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 14b00cfc6c2 [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results
14b00cfc6c2 is described below
commit 14b00cfc6c2e477067fc9e7937e34b2aa53df1eb
Author: zeruibao <ze...@databricks.com>
AuthorDate: Fri Jun 2 15:10:06 2023 -0700
[SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results
### What changes were proposed in this pull request?
We introduce the SQLConf `spark.sql.legacy.avro.allowReadingWithIncompatibleSchema` to prevent reading interval types as date or timestamp types to avoid getting corrupt dates as well as reading decimal types with incorrect precision.
### Why are the changes needed?
We found the following issues with open source Avro:
- Interval types can be read as date or timestamp types that would lead to wildly different results
For example, `Duration.ofDays(1).plusSeconds(1)` will be read as `1972-09-27`, which is weird.
- Decimal types can be read with lower precision, that leads to data being read as `null` instead of suggesting that a wider decimal format should be provided
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit test
Closes #41052 from zeruibao/SPARK-43380-fix-avro-data-type-conversion.
Lead-authored-by: zeruibao <ze...@databricks.com>
Co-authored-by: zeruibao <12...@users.noreply.github.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 423 ++++++++++++---------
.../org/apache/spark/sql/avro/AvroSuite.scala | 132 +++++++
core/src/main/resources/error/error-classes.json | 10 +
docs/sql-migration-guide.md | 1 +
.../spark/sql/errors/QueryCompilationErrors.scala | 30 ++
.../org/apache/spark/sql/internal/SQLConf.scala | 12 +
6 files changed, 435 insertions(+), 173 deletions(-)
diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index aac979cddb2..78b1f01e2ef 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -35,7 +35,9 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -117,178 +119,260 @@ private[sql] class AvroDeserializer(
val incompatibleMsg = errorPrefix +
s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"
- (avroType.getType, catalystType) match {
- case (NULL, NullType) => (updater, ordinal, _) =>
- updater.setNullAt(ordinal)
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
+ val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
+ val logicalDataType = SchemaConverters.toSqlType(avroType).dataType
+ avroType.getType match {
+ case NULL =>
+ (logicalDataType, catalystType) match {
+ case (_, NullType) => (updater, ordinal, _) =>
+ updater.setNullAt(ordinal)
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
// TODO: we can avoid boxing if future version of avro provide primitive accessors.
- case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
- updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
-
- case (INT, IntegerType) => (updater, ordinal, value) =>
- updater.setInt(ordinal, value.asInstanceOf[Int])
-
- case (INT, DateType) => (updater, ordinal, value) =>
- updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
-
- case (LONG, LongType) => (updater, ordinal, value) =>
- updater.setLong(ordinal, value.asInstanceOf[Long])
-
- case (LONG, TimestampType) => avroType.getLogicalType match {
- // For backward compatibility, if the Avro type is Long and it is not logical type
- // (the `null` case), the value is processed as timestamp type with millisecond precision.
- case null | _: TimestampMillis => (updater, ordinal, value) =>
- val millis = value.asInstanceOf[Long]
- val micros = DateTimeUtils.millisToMicros(millis)
- updater.setLong(ordinal, timestampRebaseFunc(micros))
- case _: TimestampMicros => (updater, ordinal, value) =>
- val micros = value.asInstanceOf[Long]
- updater.setLong(ordinal, timestampRebaseFunc(micros))
- case other => throw new IncompatibleSchemaException(errorPrefix +
- s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
- }
-
- case (LONG, TimestampNTZType) => avroType.getLogicalType match {
- // To keep consistent with TimestampType, if the Avro type is Long and it is not
- // logical type (the `null` case), the value is processed as TimestampNTZ
- // with millisecond precision.
- case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
- val millis = value.asInstanceOf[Long]
- val micros = DateTimeUtils.millisToMicros(millis)
- updater.setLong(ordinal, micros)
- case _: LocalTimestampMicros => (updater, ordinal, value) =>
- val micros = value.asInstanceOf[Long]
- updater.setLong(ordinal, micros)
- case other => throw new IncompatibleSchemaException(errorPrefix +
- s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
- }
-
- // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
- // For backward compatibility, we still keep this conversion.
- case (LONG, DateType) => (updater, ordinal, value) =>
- updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
-
- case (FLOAT, FloatType) => (updater, ordinal, value) =>
- updater.setFloat(ordinal, value.asInstanceOf[Float])
-
- case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
- updater.setDouble(ordinal, value.asInstanceOf[Double])
-
- case (STRING, StringType) => (updater, ordinal, value) =>
- val str = value match {
- case s: String => UTF8String.fromString(s)
- case s: Utf8 =>
- val bytes = new Array[Byte](s.getByteLength)
- System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
- UTF8String.fromBytes(bytes)
+ case BOOLEAN =>
+ (logicalDataType, catalystType) match {
+ case (_, BooleanType) => (updater, ordinal, value) =>
+ updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
}
- updater.set(ordinal, str)
-
- case (ENUM, StringType) => (updater, ordinal, value) =>
- updater.set(ordinal, UTF8String.fromString(value.toString))
-
- case (FIXED, BinaryType) => (updater, ordinal, value) =>
- updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
-
- case (BYTES, BinaryType) => (updater, ordinal, value) =>
- val bytes = value match {
- case b: ByteBuffer =>
- val bytes = new Array[Byte](b.remaining)
- b.get(bytes)
- // Do not forget to reset the position
- b.rewind()
- bytes
- case b: Array[Byte] => b
- case other =>
- throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.")
+ case INT =>
+ (logicalDataType, catalystType) match {
+ case (IntegerType, IntegerType) => (updater, ordinal, value) =>
+ updater.setInt(ordinal, value.asInstanceOf[Int])
+ case (IntegerType, DateType) => (updater, ordinal, value) =>
+ updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+ case (DateType, DateType) => (updater, ordinal, value) =>
+ updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+ case (_: YearMonthIntervalType, _: YearMonthIntervalType) => (updater, ordinal, value) =>
+ updater.setInt(ordinal, value.asInstanceOf[Int])
+ case (_: YearMonthIntervalType, _) if preventReadingIncorrectType =>
+ throw QueryCompilationErrors.avroIncorrectTypeError(
+ toFieldStr(avroPath), toFieldStr(catalystPath),
+ logicalDataType.catalogString, catalystType.catalogString, confKey.key)
+ case _ if !preventReadingIncorrectType => (updater, ordinal, value) =>
+ updater.setInt(ordinal, value.asInstanceOf[Int])
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
}
- updater.set(ordinal, bytes)
-
- case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
- val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
- val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
- val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
- updater.setDecimal(ordinal, decimal)
-
- case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
- val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
- val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
- val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
- updater.setDecimal(ordinal, decimal)
-
- case (RECORD, st: StructType) =>
- // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.
- // We can always return `false` from `applyFilters` for nested records.
- val writeRecord =
- getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false)
- (updater, ordinal, value) =>
- val row = new SpecificInternalRow(st)
- writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
- updater.set(ordinal, row)
-
- case (ARRAY, ArrayType(elementType, containsNull)) =>
- val avroElementPath = avroPath :+ "element"
- val elementWriter = newWriter(avroType.getElementType, elementType,
- avroElementPath, catalystPath :+ "element")
- (updater, ordinal, value) =>
- val collection = value.asInstanceOf[java.util.Collection[Any]]
- val result = createArrayData(elementType, collection.size())
- val elementUpdater = new ArrayDataUpdater(result)
-
- var i = 0
- val iter = collection.iterator()
- while (iter.hasNext) {
- val element = iter.next()
- if (element == null) {
- if (!containsNull) {
- throw new RuntimeException(
- s"Array value at path ${toFieldStr(avroElementPath)} is not allowed to be null")
- } else {
- elementUpdater.setNullAt(i)
- }
- } else {
- elementWriter(elementUpdater, i, element)
- }
- i += 1
+ case LONG =>
+ (logicalDataType, catalystType) match {
+ case (LongType, LongType) => (updater, ordinal, value) =>
+ updater.setLong(ordinal, value.asInstanceOf[Long])
+ case (LongType, TimestampType)
+ | (TimestampType, TimestampType)
+ |(TimestampNTZType, TimestampType) => avroType.getLogicalType match {
+ // For backward compatibility, if the Avro type is Long and it is not logical type
+ // (the `null` case), the value is processed as timestamp type with
+ // millisecond precision.
+ case null | _: TimestampMillis => (updater, ordinal, value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.millisToMicros(millis)
+ updater.setLong(ordinal, timestampRebaseFunc(micros))
+ case _: TimestampMicros => (updater, ordinal, value) =>
+ val micros = value.asInstanceOf[Long]
+ updater.setLong(ordinal, timestampRebaseFunc(micros))
+ case other => throw new IncompatibleSchemaException(errorPrefix +
+ s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
}
-
- updater.set(ordinal, result)
-
- case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType =>
- val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType,
- avroPath :+ "key", catalystPath :+ "key")
- val valueWriter = newWriter(avroType.getValueType, valueType,
- avroPath :+ "value", catalystPath :+ "value")
- (updater, ordinal, value) =>
- val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
- val keyArray = createArrayData(keyType, map.size())
- val keyUpdater = new ArrayDataUpdater(keyArray)
- val valueArray = createArrayData(valueType, map.size())
- val valueUpdater = new ArrayDataUpdater(valueArray)
- val iter = map.entrySet().iterator()
- var i = 0
- while (iter.hasNext) {
- val entry = iter.next()
- assert(entry.getKey != null)
- keyWriter(keyUpdater, i, entry.getKey)
- if (entry.getValue == null) {
- if (!valueContainsNull) {
- throw new RuntimeException(
- s"Map value at path ${toFieldStr(avroPath :+ "value")} is not allowed to be null")
- } else {
- valueUpdater.setNullAt(i)
- }
- } else {
- valueWriter(valueUpdater, i, entry.getValue)
- }
- i += 1
+ case (LongType, TimestampNTZType)
+ | (TimestampNTZType, TimestampNTZType)
+ | (TimestampType, TimestampNTZType) => avroType.getLogicalType match {
+ // To keep consistent with TimestampType, if the Avro type is Long and it is not
+ // logical type (the `null` case), the value is processed as TimestampNTZ
+ // with millisecond precision.
+ case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.millisToMicros(millis)
+ updater.setLong(ordinal, micros)
+ case _: LocalTimestampMicros => (updater, ordinal, value) =>
+ val micros = value.asInstanceOf[Long]
+ updater.setLong(ordinal, micros)
+ case other => throw new IncompatibleSchemaException(errorPrefix +
+ s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
}
-
- // The Avro map will never have null or duplicated map keys, it's safe to create a
- // ArrayBasedMapData directly here.
- updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
-
- case (UNION, _) =>
+ // Before we upgrade Avro to 1.8 for logical type support,
+ // spark-avro converts Long to Date.
+ // For backward compatibility, we still keep this conversion.
+ case (LongType, DateType) => (updater, ordinal, value) =>
+ updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
+ case (DateType, DateType) => (updater, ordinal, value) =>
+ updater.setLong(ordinal, value.asInstanceOf[Long])
+ case (_: DayTimeIntervalType, _: DayTimeIntervalType) => (updater, ordinal, value) =>
+ updater.setLong(ordinal, value.asInstanceOf[Long])
+ case (_: DayTimeIntervalType, _) if preventReadingIncorrectType =>
+ throw QueryCompilationErrors.avroIncorrectTypeError(
+ toFieldStr(avroPath), toFieldStr(catalystPath),
+ logicalDataType.catalogString, catalystType.catalogString, confKey.key)
+ case (_: DayTimeIntervalType, DateType) => (updater, ordinal, value) =>
+ updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
+ case _ if !preventReadingIncorrectType => (updater, ordinal, value) =>
+ updater.setLong(ordinal, value.asInstanceOf[Long])
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case FLOAT =>
+ (logicalDataType, catalystType) match {
+ case (_, FloatType) => (updater, ordinal, value) =>
+ updater.setFloat(ordinal, value.asInstanceOf[Float])
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case DOUBLE =>
+ (logicalDataType, catalystType) match {
+ case (_, DoubleType) => (updater, ordinal, value) =>
+ updater.setDouble(ordinal, value.asInstanceOf[Double])
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case STRING =>
+ (logicalDataType, catalystType) match {
+ case (_, StringType) => (updater, ordinal, value) =>
+ val str = value match {
+ case s: String => UTF8String.fromString(s)
+ case s: Utf8 =>
+ val bytes = new Array[Byte](s.getByteLength)
+ System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
+ UTF8String.fromBytes(bytes)
+ }
+ updater.set(ordinal, str)
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case ENUM =>
+ (logicalDataType, catalystType) match {
+ case (_, StringType) => (updater, ordinal, value) =>
+ updater.set(ordinal, UTF8String.fromString(value.toString))
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case FIXED =>
+ (logicalDataType, catalystType) match {
+ case (_, BinaryType) => (updater, ordinal, value) =>
+ updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
+ case (_, dt: DecimalType) =>
+ val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+ if (preventReadingIncorrectType &&
+ d.getPrecision - d.getScale > dt.precision - dt.scale) {
+ throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath),
+ toFieldStr(catalystPath), logicalDataType.catalogString,
+ dt.catalogString, confKey.key)
+ }
+ (updater, ordinal, value) =>
+ val bigDecimal =
+ decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
+ val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+ updater.setDecimal(ordinal, decimal)
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case BYTES =>
+ (logicalDataType, catalystType) match {
+ case (_, BinaryType) => (updater, ordinal, value) =>
+ val bytes = value match {
+ case b: ByteBuffer =>
+ val bytes = new Array[Byte](b.remaining)
+ b.get(bytes)
+ // Do not forget to reset the position
+ b.rewind()
+ bytes
+ case b: Array[Byte] => b
+ case other =>
+ throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.")
+ }
+ updater.set(ordinal, bytes)
+ case (_, dt: DecimalType) =>
+ val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+ if (preventReadingIncorrectType &&
+ d.getPrecision - d.getScale > dt.precision - dt.scale) {
+ throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath),
+ toFieldStr(catalystPath), logicalDataType.catalogString,
+ dt.catalogString, confKey.key)
+ }
+ (updater, ordinal, value) =>
+ val bigDecimal = decimalConversions
+ .fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+ val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+ updater.setDecimal(ordinal, decimal)
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case RECORD =>
+ (logicalDataType, catalystType) match {
+ case (_, st: StructType) =>
+ // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.
+ // We can always return `false` from `applyFilters` for nested records.
+ val writeRecord =
+ getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false)
+ (updater, ordinal, value) =>
+ val row = new SpecificInternalRow(st)
+ writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
+ updater.set(ordinal, row)
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case ARRAY =>
+ (logicalDataType, catalystType) match {
+ case (_, ArrayType(elementType, containsNull)) =>
+ val avroElementPath = avroPath :+ "element"
+ val elementWriter = newWriter(avroType.getElementType, elementType,
+ avroElementPath, catalystPath :+ "element")
+ (updater, ordinal, value) =>
+ val collection = value.asInstanceOf[java.util.Collection[Any]]
+ val result = createArrayData(elementType, collection.size())
+ val elementUpdater = new ArrayDataUpdater(result)
+
+ var i = 0
+ val iter = collection.iterator()
+ while (iter.hasNext) {
+ val element = iter.next()
+ if (element == null) {
+ if (!containsNull) {
+ throw new RuntimeException(
+ s"Array value at path ${toFieldStr(avroElementPath)}" +
+ s" is not allowed to be null")
+ } else {
+ elementUpdater.setNullAt(i)
+ }
+ } else {
+ elementWriter(elementUpdater, i, element)
+ }
+ i += 1
+ }
+ updater.set(ordinal, result)
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case MAP =>
+ (logicalDataType, catalystType) match {
+ case (_, MapType(keyType, valueType, valueContainsNull))
+ if keyType == StringType =>
+ val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType,
+ avroPath :+ "key", catalystPath :+ "key")
+ val valueWriter = newWriter(avroType.getValueType, valueType,
+ avroPath :+ "value", catalystPath :+ "value")
+ (updater, ordinal, value) =>
+ val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
+ val keyArray = createArrayData(keyType, map.size())
+ val keyUpdater = new ArrayDataUpdater(keyArray)
+ val valueArray = createArrayData(valueType, map.size())
+ val valueUpdater = new ArrayDataUpdater(valueArray)
+ val iter = map.entrySet().iterator()
+ var i = 0
+ while (iter.hasNext) {
+ val entry = iter.next()
+ assert(entry.getKey != null)
+ keyWriter(keyUpdater, i, entry.getKey)
+ if (entry.getValue == null) {
+ if (!valueContainsNull) {
+ throw new RuntimeException(
+ s"Map value at path ${toFieldStr(avroPath :+ "value")}" +
+ s" is not allowed to be null")
+ } else {
+ valueUpdater.setNullAt(i)
+ }
+ } else {
+ valueWriter(valueUpdater, i, entry.getValue)
+ }
+ i += 1
+ }
+ // The Avro map will never have null or duplicated map keys, it's safe to create a
+ // ArrayBasedMapData directly here.
+ updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ case UNION =>
val nonNullTypes = nonNullUnionBranches(avroType)
val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
if (nonNullTypes.nonEmpty) {
@@ -332,13 +416,6 @@ private[sql] class AvroDeserializer(
} else {
(updater, ordinal, _) => updater.setNullAt(ordinal)
}
-
- case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) =>
- updater.setInt(ordinal, value.asInstanceOf[Int])
-
- case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) =>
- updater.setLong(ordinal, value.asInstanceOf[Long])
-
case _ => throw new IncompatibleSchemaException(incompatibleMsg)
}
}
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 97260e6eea6..7ca34388523 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -32,6 +32,7 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException}
import org.apache.spark.TestUtils.assertExceptionMsg
@@ -814,6 +815,137 @@ abstract class AvroSuite
}
}
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of decimal type to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+ sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
+ // With the flag disabled, we will throw an exception if there is a mismatch
+ withSQLConf(confKey -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
+ }
+ ExceptionUtils.getRootCause(e) match {
+ case ex: AnalysisException =>
+ checkError(
+ exception = ex,
+ errorClass = "AVRO_LOWER_PRECISION",
+ parameters = Map("avroPath" -> "field 'a'",
+ "sqlPath" -> "field 'a'",
+ "avroType" -> "decimal\\(12,10\\)",
+ "sqlType" -> "\"DECIMAL\\(4,3\\)\"",
+ "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
+ matchPVals = true
+ )
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ // The following used to work, so it should still work with the flag enabled
+ checkAnswer(
+ spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
+ Row(new java.math.BigDecimal("13.123"))
+ )
+ withSQLConf(confKey -> "true") {
+ // With the flag enabled, we return a null silently, which isn't great
+ checkAnswer(
+ spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString),
+ Row(null)
+ )
+ checkAnswer(
+ spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
+ Row(new java.math.BigDecimal("13.123"))
+ )
+ }
+ }
+ }
+
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of DayTimeIntervalType to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+ val schema = StructType(Array(StructField("a", DayTimeIntervalType(), false)))
+ val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
+
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.format("avro").save(path.getCanonicalPath)
+
+ withSQLConf(confKey -> "false") {
+ Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+ val e = intercept[SparkException] {
+ spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
+ }
+
+ ExceptionUtils.getRootCause(e) match {
+ case ex: AnalysisException =>
+ checkError(
+ exception = ex,
+ errorClass = "AVRO_INCORRECT_TYPE",
+ parameters = Map("avroPath" -> "field 'a'",
+ "sqlPath" -> "field 'a'",
+ "avroType" -> "interval day to second",
+ "sqlType" -> s""""$sqlType"""",
+ "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
+ matchPVals = true
+ )
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ }
+
+ withSQLConf(confKey -> "true") {
+ // Allow conversion and do not need to check result
+ spark.read.schema("a Date").format("avro").load(path.toString)
+ spark.read.schema("a timestamp").format("avro").load(path.toString)
+ spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+ }
+ }
+ }
+
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of YearMonthIntervalType to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+ val schema = StructType(Array(StructField("a", YearMonthIntervalType(), false)))
+ val data = Seq(Row(java.time.Period.of(1, 1, 0)))
+
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.format("avro").save(path.getCanonicalPath)
+
+ withSQLConf(confKey -> "false") {
+ Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+ val e = intercept[SparkException] {
+ spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
+ }
+
+ ExceptionUtils.getRootCause(e) match {
+ case ex: AnalysisException =>
+ checkError(
+ exception = ex,
+ errorClass = "AVRO_INCORRECT_TYPE",
+ parameters = Map("avroPath" -> "field 'a'",
+ "sqlPath" -> "field 'a'",
+ "avroType" -> "interval year to month",
+ "sqlType" -> s""""$sqlType"""",
+ "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
+ matchPVals = true
+ )
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ }
+
+ withSQLConf(confKey -> "true") {
+ // Allow conversion and do not need to check result
+ spark.read.schema("a Date").format("avro").load(path.toString)
+ spark.read.schema("a timestamp").format("avro").load(path.toString)
+ spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+ }
+ }
+ }
+
test("converting some specific sparkSQL types to avro") {
withTempPath { tempDir =>
val testSchema = StructType(Seq(
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 8d280e40922..c73223fba39 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -64,6 +64,16 @@
}
}
},
+ "AVRO_INCORRECT_TYPE" : {
+ "message" : [
+ "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: <key>."
+ ]
+ },
+ "AVRO_LOWER_PRECISION" : {
+ "message" : [
+ "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: <key>."
+ ]
+ },
"BINARY_ARITHMETIC_OVERFLOW" : {
"message" : [
"<value1> <symbol> <value2> caused overflow."
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 58627801fc7..6c05514d242 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -26,6 +26,7 @@ license: |
- Since Spark 3.5, the JDBC options related to DS V2 pushdown are `true` by default. These options include: `pushDownAggregate`, `pushDownLimit`, `pushDownOffset` and `pushDownTableSample`. To restore the legacy behavior, please set them to `false`. e.g. set `spark.sql.catalog.your_catalog_name.pushDownAggregate` to `false`.
- Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`.
+- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
## Upgrading from Spark SQL 3.3 to 3.4
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index f4ca9147f91..94b8ee25dd2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3537,4 +3537,34 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
errorClass = "CANNOT_RENAME_ACROSS_SCHEMA", messageParameters = Map("type" -> "table")
)
}
+
+ def avroIncorrectTypeError(
+ avroPath: String, sqlPath: String, avroType: String,
+ sqlType: String, key: String): Throwable = {
+ new AnalysisException(
+ errorClass = "AVRO_INCORRECT_TYPE",
+ messageParameters = Map(
+ "avroPath" -> avroPath,
+ "sqlPath" -> sqlPath,
+ "avroType" -> avroType,
+ "sqlType" -> toSQLType(sqlType),
+ "key" -> key
+ )
+ )
+ }
+
+ def avroLowerPrecisionError(
+ avroPath: String, sqlPath: String, avroType: String,
+ sqlType: String, key: String): Throwable = {
+ new AnalysisException(
+ errorClass = "AVRO_LOWER_PRECISION",
+ messageParameters = Map(
+ "avroPath" -> avroPath,
+ "sqlPath" -> sqlPath,
+ "avroType" -> avroType,
+ "sqlType" -> toSQLType(sqlType),
+ "key" -> key
+ )
+ )
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e8185202a7e..8d1e73cb86f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4209,6 +4209,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
+ buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
+ .internal()
+ .doc("When set to false, if types in Avro are encoded in the same format, but " +
+ "the type in the Avro schema explicitly says that the data types are different, " +
+ "reject reading the data type in the format to avoid returning incorrect results. " +
+ "When set to true, it restores the legacy behavior of allow reading the data in the" +
+ " format, which may return incorrect results.")
+ .version("3.5.0")
+ .booleanConf
+ .createWithDefault(false)
+
val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org