You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/02 22:10:28 UTC

[spark] branch master updated: [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 14b00cfc6c2 [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results
14b00cfc6c2 is described below

commit 14b00cfc6c2e477067fc9e7937e34b2aa53df1eb
Author: zeruibao <ze...@databricks.com>
AuthorDate: Fri Jun 2 15:10:06 2023 -0700

    [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results
    
    ### What changes were proposed in this pull request?
    We introduce the SQLConf `spark.sql.legacy.avro.allowReadingWithIncompatibleSchema` to prevent reading interval types as date or timestamp types to avoid getting corrupt dates as well as reading decimal types with incorrect precision.
    
    ### Why are the changes needed?
    We found the following issues with open source Avro:
    
    - Interval types can be read as date or timestamp types that would lead to wildly different results
       For example, `Duration.ofDays(1).plusSeconds(1)` will be read as `1972-09-27`, which is weird.
    - Decimal types can be read with lower precision, that leads to data being read as `null` instead of suggesting that a wider decimal format should be provided
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New unit test
    
    Closes #41052 from zeruibao/SPARK-43380-fix-avro-data-type-conversion.
    
    Lead-authored-by: zeruibao <ze...@databricks.com>
    Co-authored-by: zeruibao <12...@users.noreply.github.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   | 423 ++++++++++++---------
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 132 +++++++
 core/src/main/resources/error/error-classes.json   |  10 +
 docs/sql-migration-guide.md                        |   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  30 ++
 .../org/apache/spark/sql/internal/SQLConf.scala    |  12 +
 6 files changed, 435 insertions(+), 173 deletions(-)

diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index aac979cddb2..78b1f01e2ef 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -35,7 +35,9 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -117,178 +119,260 @@ private[sql] class AvroDeserializer(
     val incompatibleMsg = errorPrefix +
         s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"
 
-    (avroType.getType, catalystType) match {
-      case (NULL, NullType) => (updater, ordinal, _) =>
-        updater.setNullAt(ordinal)
+    val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
+    val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
 
+    val logicalDataType = SchemaConverters.toSqlType(avroType).dataType
+    avroType.getType match {
+      case NULL =>
+        (logicalDataType, catalystType) match {
+          case (_, NullType) => (updater, ordinal, _) =>
+            updater.setNullAt(ordinal)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
       // TODO: we can avoid boxing if future version of avro provide primitive accessors.
-      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
-        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
-
-      case (INT, IntegerType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, value.asInstanceOf[Int])
-
-      case (INT, DateType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
-
-      case (LONG, LongType) => (updater, ordinal, value) =>
-        updater.setLong(ordinal, value.asInstanceOf[Long])
-
-      case (LONG, TimestampType) => avroType.getLogicalType match {
-        // For backward compatibility, if the Avro type is Long and it is not logical type
-        // (the `null` case), the value is processed as timestamp type with millisecond precision.
-        case null | _: TimestampMillis => (updater, ordinal, value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.millisToMicros(millis)
-          updater.setLong(ordinal, timestampRebaseFunc(micros))
-        case _: TimestampMicros => (updater, ordinal, value) =>
-          val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, timestampRebaseFunc(micros))
-        case other => throw new IncompatibleSchemaException(errorPrefix +
-          s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
-      }
-
-      case (LONG, TimestampNTZType) => avroType.getLogicalType match {
-        // To keep consistent with TimestampType, if the Avro type is Long and it is not
-        // logical type (the `null` case), the value is processed as TimestampNTZ
-        // with millisecond precision.
-        case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.millisToMicros(millis)
-          updater.setLong(ordinal, micros)
-        case _: LocalTimestampMicros => (updater, ordinal, value) =>
-          val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, micros)
-        case other => throw new IncompatibleSchemaException(errorPrefix +
-          s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
-      }
-
-      // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
-      // For backward compatibility, we still keep this conversion.
-      case (LONG, DateType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
-
-      case (FLOAT, FloatType) => (updater, ordinal, value) =>
-        updater.setFloat(ordinal, value.asInstanceOf[Float])
-
-      case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
-        updater.setDouble(ordinal, value.asInstanceOf[Double])
-
-      case (STRING, StringType) => (updater, ordinal, value) =>
-        val str = value match {
-          case s: String => UTF8String.fromString(s)
-          case s: Utf8 =>
-            val bytes = new Array[Byte](s.getByteLength)
-            System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
-            UTF8String.fromBytes(bytes)
+      case BOOLEAN =>
+        (logicalDataType, catalystType) match {
+          case (_, BooleanType) => (updater, ordinal, value) =>
+            updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
         }
-        updater.set(ordinal, str)
-
-      case (ENUM, StringType) => (updater, ordinal, value) =>
-        updater.set(ordinal, UTF8String.fromString(value.toString))
-
-      case (FIXED, BinaryType) => (updater, ordinal, value) =>
-        updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
-
-      case (BYTES, BinaryType) => (updater, ordinal, value) =>
-        val bytes = value match {
-          case b: ByteBuffer =>
-            val bytes = new Array[Byte](b.remaining)
-            b.get(bytes)
-            // Do not forget to reset the position
-            b.rewind()
-            bytes
-          case b: Array[Byte] => b
-          case other =>
-            throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.")
+      case INT =>
+        (logicalDataType, catalystType) match {
+          case (IntegerType, IntegerType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, value.asInstanceOf[Int])
+          case (IntegerType, DateType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+          case (DateType, DateType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+          case (_: YearMonthIntervalType, _: YearMonthIntervalType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, value.asInstanceOf[Int])
+          case (_: YearMonthIntervalType, _) if preventReadingIncorrectType =>
+            throw QueryCompilationErrors.avroIncorrectTypeError(
+              toFieldStr(avroPath), toFieldStr(catalystPath),
+              logicalDataType.catalogString, catalystType.catalogString, confKey.key)
+          case _ if !preventReadingIncorrectType => (updater, ordinal, value) =>
+            updater.setInt(ordinal, value.asInstanceOf[Int])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
         }
-        updater.set(ordinal, bytes)
-
-      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
-        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-        val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
-        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
-        updater.setDecimal(ordinal, decimal)
-
-      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
-        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-        val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
-        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
-        updater.setDecimal(ordinal, decimal)
-
-      case (RECORD, st: StructType) =>
-        // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.
-        // We can always return `false` from `applyFilters` for nested records.
-        val writeRecord =
-          getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false)
-        (updater, ordinal, value) =>
-          val row = new SpecificInternalRow(st)
-          writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
-          updater.set(ordinal, row)
-
-      case (ARRAY, ArrayType(elementType, containsNull)) =>
-        val avroElementPath = avroPath :+ "element"
-        val elementWriter = newWriter(avroType.getElementType, elementType,
-          avroElementPath, catalystPath :+ "element")
-        (updater, ordinal, value) =>
-          val collection = value.asInstanceOf[java.util.Collection[Any]]
-          val result = createArrayData(elementType, collection.size())
-          val elementUpdater = new ArrayDataUpdater(result)
-
-          var i = 0
-          val iter = collection.iterator()
-          while (iter.hasNext) {
-            val element = iter.next()
-            if (element == null) {
-              if (!containsNull) {
-                throw new RuntimeException(
-                  s"Array value at path ${toFieldStr(avroElementPath)} is not allowed to be null")
-              } else {
-                elementUpdater.setNullAt(i)
-              }
-            } else {
-              elementWriter(elementUpdater, i, element)
-            }
-            i += 1
+      case LONG =>
+        (logicalDataType, catalystType) match {
+          case (LongType, LongType) => (updater, ordinal, value) =>
+            updater.setLong(ordinal, value.asInstanceOf[Long])
+          case (LongType, TimestampType)
+               | (TimestampType, TimestampType)
+               |(TimestampNTZType, TimestampType) => avroType.getLogicalType match {
+            // For backward compatibility, if the Avro type is Long and it is not logical type
+            // (the `null` case), the value is processed as timestamp type with
+            // millisecond precision.
+            case null | _: TimestampMillis => (updater, ordinal, value) =>
+              val millis = value.asInstanceOf[Long]
+              val micros = DateTimeUtils.millisToMicros(millis)
+              updater.setLong(ordinal, timestampRebaseFunc(micros))
+            case _: TimestampMicros => (updater, ordinal, value) =>
+              val micros = value.asInstanceOf[Long]
+              updater.setLong(ordinal, timestampRebaseFunc(micros))
+            case other => throw new IncompatibleSchemaException(errorPrefix +
+              s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
           }
-
-          updater.set(ordinal, result)
-
-      case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType =>
-        val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType,
-          avroPath :+ "key", catalystPath :+ "key")
-        val valueWriter = newWriter(avroType.getValueType, valueType,
-          avroPath :+ "value", catalystPath :+ "value")
-        (updater, ordinal, value) =>
-          val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
-          val keyArray = createArrayData(keyType, map.size())
-          val keyUpdater = new ArrayDataUpdater(keyArray)
-          val valueArray = createArrayData(valueType, map.size())
-          val valueUpdater = new ArrayDataUpdater(valueArray)
-          val iter = map.entrySet().iterator()
-          var i = 0
-          while (iter.hasNext) {
-            val entry = iter.next()
-            assert(entry.getKey != null)
-            keyWriter(keyUpdater, i, entry.getKey)
-            if (entry.getValue == null) {
-              if (!valueContainsNull) {
-                throw new RuntimeException(
-                  s"Map value at path ${toFieldStr(avroPath :+ "value")} is not allowed to be null")
-              } else {
-                valueUpdater.setNullAt(i)
-              }
-            } else {
-              valueWriter(valueUpdater, i, entry.getValue)
-            }
-            i += 1
+          case (LongType, TimestampNTZType)
+               | (TimestampNTZType, TimestampNTZType)
+               | (TimestampType, TimestampNTZType) => avroType.getLogicalType match {
+            // To keep consistent with TimestampType, if the Avro type is Long and it is not
+            // logical type (the `null` case), the value is processed as TimestampNTZ
+            // with millisecond precision.
+            case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
+              val millis = value.asInstanceOf[Long]
+              val micros = DateTimeUtils.millisToMicros(millis)
+              updater.setLong(ordinal, micros)
+            case _: LocalTimestampMicros => (updater, ordinal, value) =>
+              val micros = value.asInstanceOf[Long]
+              updater.setLong(ordinal, micros)
+            case other => throw new IncompatibleSchemaException(errorPrefix +
+              s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
           }
-
-          // The Avro map will never have null or duplicated map keys, it's safe to create a
-          // ArrayBasedMapData directly here.
-          updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
-
-      case (UNION, _) =>
+          // Before we upgrade Avro to 1.8 for logical type support,
+          // spark-avro converts Long to Date.
+          // For backward compatibility, we still keep this conversion.
+          case (LongType, DateType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
+          case (DateType, DateType) => (updater, ordinal, value) =>
+            updater.setLong(ordinal, value.asInstanceOf[Long])
+          case (_: DayTimeIntervalType, _: DayTimeIntervalType) => (updater, ordinal, value) =>
+            updater.setLong(ordinal, value.asInstanceOf[Long])
+          case (_: DayTimeIntervalType, _) if preventReadingIncorrectType =>
+            throw QueryCompilationErrors.avroIncorrectTypeError(
+              toFieldStr(avroPath), toFieldStr(catalystPath),
+              logicalDataType.catalogString, catalystType.catalogString, confKey.key)
+          case (_: DayTimeIntervalType, DateType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
+          case _ if !preventReadingIncorrectType => (updater, ordinal, value) =>
+            updater.setLong(ordinal, value.asInstanceOf[Long])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case FLOAT =>
+        (logicalDataType, catalystType) match {
+          case (_, FloatType) => (updater, ordinal, value) =>
+            updater.setFloat(ordinal, value.asInstanceOf[Float])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case DOUBLE =>
+        (logicalDataType, catalystType) match {
+          case (_, DoubleType) => (updater, ordinal, value) =>
+            updater.setDouble(ordinal, value.asInstanceOf[Double])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case STRING =>
+        (logicalDataType, catalystType) match {
+          case (_, StringType) => (updater, ordinal, value) =>
+            val str = value match {
+              case s: String => UTF8String.fromString(s)
+              case s: Utf8 =>
+                val bytes = new Array[Byte](s.getByteLength)
+                System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
+                UTF8String.fromBytes(bytes)
+            }
+            updater.set(ordinal, str)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case ENUM =>
+        (logicalDataType, catalystType) match {
+          case (_, StringType) => (updater, ordinal, value) =>
+            updater.set(ordinal, UTF8String.fromString(value.toString))
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case FIXED =>
+        (logicalDataType, catalystType) match {
+          case (_, BinaryType) => (updater, ordinal, value) =>
+            updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
+          case (_, dt: DecimalType) =>
+            val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+            if (preventReadingIncorrectType &&
+              d.getPrecision - d.getScale > dt.precision - dt.scale) {
+              throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath),
+                toFieldStr(catalystPath), logicalDataType.catalogString,
+                dt.catalogString, confKey.key)
+            }
+            (updater, ordinal, value) =>
+              val bigDecimal =
+                decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
+              val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+              updater.setDecimal(ordinal, decimal)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case BYTES =>
+        (logicalDataType, catalystType) match {
+          case (_, BinaryType) => (updater, ordinal, value) =>
+            val bytes = value match {
+              case b: ByteBuffer =>
+                val bytes = new Array[Byte](b.remaining)
+                b.get(bytes)
+                // Do not forget to reset the position
+                b.rewind()
+                bytes
+              case b: Array[Byte] => b
+              case other =>
+                throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.")
+            }
+            updater.set(ordinal, bytes)
+          case (_, dt: DecimalType) =>
+            val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+            if (preventReadingIncorrectType &&
+              d.getPrecision - d.getScale > dt.precision - dt.scale) {
+              throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath),
+                toFieldStr(catalystPath), logicalDataType.catalogString,
+                dt.catalogString, confKey.key)
+            }
+            (updater, ordinal, value) =>
+              val bigDecimal = decimalConversions
+                .fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+              val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+              updater.setDecimal(ordinal, decimal)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case RECORD =>
+        (logicalDataType, catalystType) match {
+          case (_, st: StructType) =>
+            // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.
+            // We can always return `false` from `applyFilters` for nested records.
+            val writeRecord =
+              getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false)
+            (updater, ordinal, value) =>
+              val row = new SpecificInternalRow(st)
+              writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
+              updater.set(ordinal, row)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case ARRAY =>
+        (logicalDataType, catalystType) match {
+          case (_, ArrayType(elementType, containsNull)) =>
+            val avroElementPath = avroPath :+ "element"
+            val elementWriter = newWriter(avroType.getElementType, elementType,
+              avroElementPath, catalystPath :+ "element")
+            (updater, ordinal, value) =>
+              val collection = value.asInstanceOf[java.util.Collection[Any]]
+              val result = createArrayData(elementType, collection.size())
+              val elementUpdater = new ArrayDataUpdater(result)
+
+              var i = 0
+              val iter = collection.iterator()
+              while (iter.hasNext) {
+                val element = iter.next()
+                if (element == null) {
+                  if (!containsNull) {
+                    throw new RuntimeException(
+                      s"Array value at path ${toFieldStr(avroElementPath)}" +
+                        s" is not allowed to be null")
+                  } else {
+                    elementUpdater.setNullAt(i)
+                  }
+                } else {
+                  elementWriter(elementUpdater, i, element)
+                }
+                i += 1
+              }
+              updater.set(ordinal, result)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case MAP =>
+        (logicalDataType, catalystType) match {
+          case (_, MapType(keyType, valueType, valueContainsNull))
+            if keyType == StringType =>
+            val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType,
+              avroPath :+ "key", catalystPath :+ "key")
+            val valueWriter = newWriter(avroType.getValueType, valueType,
+              avroPath :+ "value", catalystPath :+ "value")
+            (updater, ordinal, value) =>
+              val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
+              val keyArray = createArrayData(keyType, map.size())
+              val keyUpdater = new ArrayDataUpdater(keyArray)
+              val valueArray = createArrayData(valueType, map.size())
+              val valueUpdater = new ArrayDataUpdater(valueArray)
+              val iter = map.entrySet().iterator()
+              var i = 0
+              while (iter.hasNext) {
+                val entry = iter.next()
+                assert(entry.getKey != null)
+                keyWriter(keyUpdater, i, entry.getKey)
+                if (entry.getValue == null) {
+                  if (!valueContainsNull) {
+                    throw new RuntimeException(
+                      s"Map value at path ${toFieldStr(avroPath :+ "value")}" +
+                        s" is not allowed to be null")
+                  } else {
+                    valueUpdater.setNullAt(i)
+                  }
+                } else {
+                  valueWriter(valueUpdater, i, entry.getValue)
+                }
+                i += 1
+              }
+              // The Avro map will never have null or duplicated map keys, it's safe to create a
+              // ArrayBasedMapData directly here.
+              updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
+      case UNION =>
         val nonNullTypes = nonNullUnionBranches(avroType)
         val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
         if (nonNullTypes.nonEmpty) {
@@ -332,13 +416,6 @@ private[sql] class AvroDeserializer(
         } else {
           (updater, ordinal, _) => updater.setNullAt(ordinal)
         }
-
-      case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, value.asInstanceOf[Int])
-
-      case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) =>
-        updater.setLong(ordinal, value.asInstanceOf[Long])
-
       case _ => throw new IncompatibleSchemaException(incompatibleMsg)
     }
   }
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 97260e6eea6..7ca34388523 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -32,6 +32,7 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
 import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException}
 import org.apache.spark.TestUtils.assertExceptionMsg
@@ -814,6 +815,137 @@ abstract class AvroSuite
     }
   }
 
+  test("SPARK-43380: Fix Avro data type conversion" +
+      " of decimal type to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+      sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
+      // With the flag disabled, we will throw an exception if there is a mismatch
+      withSQLConf(confKey -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
+        }
+        ExceptionUtils.getRootCause(e) match {
+          case ex: AnalysisException =>
+            checkError(
+              exception = ex,
+              errorClass = "AVRO_LOWER_PRECISION",
+              parameters = Map("avroPath" -> "field 'a'",
+                "sqlPath" -> "field 'a'",
+                "avroType" -> "decimal\\(12,10\\)",
+                "sqlType" -> "\"DECIMAL\\(4,3\\)\"",
+                "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
+              matchPVals = true
+            )
+          case other =>
+            fail(s"Received unexpected exception", other)
+        }
+      }
+      // The following used to work, so it should still work with the flag enabled
+      checkAnswer(
+        spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
+        Row(new java.math.BigDecimal("13.123"))
+      )
+      withSQLConf(confKey -> "true") {
+        // With the flag enabled, we return a null silently, which isn't great
+        checkAnswer(
+          spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString),
+          Row(null)
+        )
+        checkAnswer(
+          spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
+          Row(new java.math.BigDecimal("13.123"))
+        )
+      }
+    }
+  }
+
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of DayTimeIntervalType to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+      val schema = StructType(Array(StructField("a", DayTimeIntervalType(), false)))
+      val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
+
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.format("avro").save(path.getCanonicalPath)
+
+      withSQLConf(confKey -> "false") {
+        Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+          val e = intercept[SparkException] {
+            spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
+          }
+
+          ExceptionUtils.getRootCause(e) match {
+            case ex: AnalysisException =>
+              checkError(
+                exception = ex,
+                errorClass = "AVRO_INCORRECT_TYPE",
+                parameters = Map("avroPath" -> "field 'a'",
+                  "sqlPath" -> "field 'a'",
+                  "avroType" -> "interval day to second",
+                  "sqlType" -> s""""$sqlType"""",
+                  "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
+                matchPVals = true
+              )
+            case other =>
+              fail(s"Received unexpected exception", other)
+          }
+        }
+      }
+
+      withSQLConf(confKey -> "true") {
+        // Allow conversion and do not need to check result
+        spark.read.schema("a Date").format("avro").load(path.toString)
+        spark.read.schema("a timestamp").format("avro").load(path.toString)
+        spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+      }
+    }
+  }
+
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of YearMonthIntervalType to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+      val schema = StructType(Array(StructField("a", YearMonthIntervalType(), false)))
+      val data = Seq(Row(java.time.Period.of(1, 1, 0)))
+
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.format("avro").save(path.getCanonicalPath)
+
+      withSQLConf(confKey -> "false") {
+        Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+          val e = intercept[SparkException] {
+            spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
+          }
+
+          ExceptionUtils.getRootCause(e) match {
+            case ex: AnalysisException =>
+              checkError(
+                exception = ex,
+                errorClass = "AVRO_INCORRECT_TYPE",
+                parameters = Map("avroPath" -> "field 'a'",
+                  "sqlPath" -> "field 'a'",
+                  "avroType" -> "interval year to month",
+                  "sqlType" -> s""""$sqlType"""",
+                  "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
+                matchPVals = true
+              )
+            case other =>
+              fail(s"Received unexpected exception", other)
+          }
+        }
+      }
+
+      withSQLConf(confKey -> "true") {
+        // Allow conversion and do not need to check result
+        spark.read.schema("a Date").format("avro").load(path.toString)
+        spark.read.schema("a timestamp").format("avro").load(path.toString)
+        spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+      }
+    }
+  }
+
   test("converting some specific sparkSQL types to avro") {
     withTempPath { tempDir =>
       val testSchema = StructType(Seq(
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 8d280e40922..c73223fba39 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -64,6 +64,16 @@
       }
     }
   },
+  "AVRO_INCORRECT_TYPE" : {
+    "message" : [
+      "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: <key>."
+    ]
+  },
+  "AVRO_LOWER_PRECISION" : {
+    "message" : [
+      "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: <key>."
+    ]
+  },
   "BINARY_ARITHMETIC_OVERFLOW" : {
     "message" : [
       "<value1> <symbol> <value2> caused overflow."
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 58627801fc7..6c05514d242 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -26,6 +26,7 @@ license: |
 
 - Since Spark 3.5, the JDBC options related to DS V2 pushdown are `true` by default. These options include: `pushDownAggregate`, `pushDownLimit`, `pushDownOffset` and `pushDownTableSample`. To restore the legacy behavior, please set them to `false`. e.g. set `spark.sql.catalog.your_catalog_name.pushDownAggregate` to `false`.
 - Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`.
+- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
 
 ## Upgrading from Spark SQL 3.3 to 3.4
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index f4ca9147f91..94b8ee25dd2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3537,4 +3537,34 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       errorClass = "CANNOT_RENAME_ACROSS_SCHEMA", messageParameters = Map("type" -> "table")
     )
   }
+
+  def avroIncorrectTypeError(
+      avroPath: String, sqlPath: String, avroType: String,
+      sqlType: String, key: String): Throwable = {
+    new AnalysisException(
+      errorClass = "AVRO_INCORRECT_TYPE",
+      messageParameters = Map(
+        "avroPath" -> avroPath,
+        "sqlPath" -> sqlPath,
+        "avroType" -> avroType,
+        "sqlType" -> toSQLType(sqlType),
+        "key" -> key
+      )
+    )
+  }
+
+  def avroLowerPrecisionError(
+      avroPath: String, sqlPath: String, avroType: String,
+      sqlType: String, key: String): Throwable = {
+    new AnalysisException(
+      errorClass = "AVRO_LOWER_PRECISION",
+      messageParameters = Map(
+        "avroPath" -> avroPath,
+        "sqlPath" -> sqlPath,
+        "avroType" -> avroType,
+        "sqlType" -> toSQLType(sqlType),
+        "key" -> key
+      )
+    )
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e8185202a7e..8d1e73cb86f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4209,6 +4209,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
+    buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
+      .internal()
+      .doc("When set to false, if types in Avro are encoded in the same format, but " +
+        "the type in the Avro schema explicitly says that the data types are different, " +
+        "reject reading the data type in the format to avoid returning incorrect results. " +
+        "When set to true, it restores the legacy behavior of allow reading the data in the" +
+        " format, which may return incorrect results.")
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
     buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org