You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/26 05:30:38 UTC

[spark] branch branch-2.4 updated: [SPARK-34229][SQL] Avro should read decimal values with the file schema

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a194eab  [SPARK-34229][SQL] Avro should read decimal values with the file schema
a194eab is described below

commit a194eab13b88f9f001cc9cdf5165c669ede88795
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Jan 26 14:23:54 2021 +0900

    [SPARK-34229][SQL] Avro should read decimal values with the file schema
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix Avro data source to use the decimal precision and scale of file schema.
    
    ### Why are the changes needed?
    
    The decimal value should be interpreted with its original precision and scale. Otherwise, it returns incorrect result like the following. The schema mismatch happens when we use `userSpecifiedSchema` or there are multiple files with inconsistent schema or HiveMetastore schema is updated by the user.
    ```scala
    scala> sql("SELECT 3.14 a").write.format("avro").save("/tmp/avro")
    scala> spark.read.schema("a DECIMAL(4, 3)").format("avro").load("/tmp/avro").show
    +-----+
    |    a|
    +-----+
    |0.314|
    +-----+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this will return correct result.
    
    ### How was this patch tested?
    
    Pass the CI with the newly added test case.
    
    Closes #31329 from dongjoon-hyun/SPARK-34229.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 7d09eac1ccb6a14a36fce30ae7cda575c29e1974)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/sql/avro/AvroDeserializer.scala     | 16 ++++++++--------
 .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala |  8 ++++++++
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index b10405c..1e59349 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -145,16 +145,16 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
         }
         updater.set(ordinal, bytes)
 
-      case (FIXED, d: DecimalType) => (updater, ordinal, value) =>
-        val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType,
-          LogicalTypes.decimal(d.precision, d.scale))
-        val decimal = createDecimal(bigDecimal, d.precision, d.scale)
+      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+        val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
+        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
         updater.setDecimal(ordinal, decimal)
 
-      case (BYTES, d: DecimalType) => (updater, ordinal, value) =>
-        val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType,
-          LogicalTypes.decimal(d.precision, d.scale))
-        val decimal = createDecimal(bigDecimal, d.precision, d.scale)
+      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+        val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
         updater.setDecimal(ordinal, decimal)
 
       case (RECORD, st: StructType) =>
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 12ef883..34f01de 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -597,6 +597,14 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     }
   }
 
+  test("SPARK-34229: Avro should read decimal values with the file schema") {
+    withTempPath { path =>
+      sql("SELECT 3.14 a").write.format("avro").save(path.toString)
+      val data = spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
+      assert(data.map(_ (0)).contains(new java.math.BigDecimal("3.140")))
+    }
+  }
+
   test("converting some specific sparkSQL types to avro") {
     withTempPath { tempDir =>
       val testSchema = StructType(Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org