You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/26 05:30:19 UTC

[spark] branch branch-3.0 updated: [SPARK-34229][SQL] Avro should read decimal values with the file schema

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9e154ee  [SPARK-34229][SQL] Avro should read decimal values with the file schema
9e154ee is described below

commit 9e154eeca083eeeff921a3162040e80c0be9d69e
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Jan 26 14:23:54 2021 +0900

    [SPARK-34229][SQL] Avro should read decimal values with the file schema
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix Avro data source to use the decimal precision and scale of file schema.
    
    ### Why are the changes needed?
    
    The decimal value should be interpreted with its original precision and scale. Otherwise, it returns incorrect result like the following. The schema mismatch happens when we use `userSpecifiedSchema` or there are multiple files with inconsistent schema or HiveMetastore schema is updated by the user.
    ```scala
    scala> sql("SELECT 3.14 a").write.format("avro").save("/tmp/avro")
    scala> spark.read.schema("a DECIMAL(4, 3)").format("avro").load("/tmp/avro").show
    +-----+
    |    a|
    +-----+
    |0.314|
    +-----+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this will return correct result.
    
    ### How was this patch tested?
    
    Pass the CI with the newly added test case.
    
    Closes #31329 from dongjoon-hyun/SPARK-34229.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 7d09eac1ccb6a14a36fce30ae7cda575c29e1974)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/sql/avro/AvroDeserializer.scala     | 16 ++++++++--------
 .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala |  8 ++++++++
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 7580957..6b2063c 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -165,16 +165,16 @@ private[sql] class AvroDeserializer(
         }
         updater.set(ordinal, bytes)
 
-      case (FIXED, d: DecimalType) => (updater, ordinal, value) =>
-        val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType,
-          LogicalTypes.decimal(d.precision, d.scale))
-        val decimal = createDecimal(bigDecimal, d.precision, d.scale)
+      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+        val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
+        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
         updater.setDecimal(ordinal, decimal)
 
-      case (BYTES, d: DecimalType) => (updater, ordinal, value) =>
-        val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType,
-          LogicalTypes.decimal(d.precision, d.scale))
-        val decimal = createDecimal(bigDecimal, d.precision, d.scale)
+      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+        val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
         updater.setDecimal(ordinal, decimal)
 
       case (RECORD, st: StructType) =>
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 5d7d2e4..e576c16 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -589,6 +589,14 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     }
   }
 
+  test("SPARK-34229: Avro should read decimal values with the file schema") {
+    withTempPath { path =>
+      sql("SELECT 3.14 a").write.format("avro").save(path.toString)
+      val data = spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
+      assert(data.map(_ (0)).contains(new java.math.BigDecimal("3.140")))
+    }
+  }
+
   test("converting some specific sparkSQL types to avro") {
     withTempPath { tempDir =>
       val testSchema = StructType(Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org