You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/28 03:22:20 UTC

[spark] branch branch-2.4 updated: [SPARK-27858][SQL] Fix for avro deserialization on union types with multiple non-null types

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 0d9be28  [SPARK-27858][SQL] Fix for avro deserialization on union types with multiple non-null types
0d9be28 is described below

commit 0d9be280dee75e555cf847ba043082633a0065e1
Author: Gabbi Merz <gm...@palantir.com>
AuthorDate: Mon May 27 20:09:23 2019 -0700

    [SPARK-27858][SQL] Fix for avro deserialization on union types with multiple non-null types
    
    ## What changes were proposed in this pull request?
    
    This PR aims to fix an issue on a union avro type with more than one non-null value (for instance `["string", "null", "int"]`) whose the deserialization to a DataFrame would throw a `java.lang.ArrayIndexOutOfBoundsException`. The issue was that the `fieldWriter` relied on the index from the avro schema before nulls were filtered out.
    
    ## How was this patch tested?
    
    A test for the case of multiple non-null values was added and the tests were run using sbt by running `testOnly org.apache.spark.sql.avro.AvroSuite`
    
    Closes #24722 from gcmerz/master.
    
    Authored-by: Gabbi Merz <gm...@palantir.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 29e154b2f12058c59eaa411989ead833119f165f)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  3 ++-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 26 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 272e7d5..b10405c 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -223,6 +223,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
       case (UNION, _) =>
         val allTypes = avroType.getTypes.asScala
         val nonNullTypes = allTypes.filter(_.getType != NULL)
+        val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
         if (nonNullTypes.nonEmpty) {
           if (nonNullTypes.length == 1) {
             newWriter(nonNullTypes.head, catalystType, path)
@@ -251,7 +252,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
                     (updater, ordinal, value) => {
                       val row = new SpecificInternalRow(st)
                       val fieldUpdater = new RowUpdater(row)
-                      val i = GenericData.get().resolveUnion(avroType, value)
+                      val i = GenericData.get().resolveUnion(nonNullAvroType, value)
                       fieldWriters(i)(fieldUpdater, i, value)
                       updater.set(ordinal, row)
                     }
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 8b088b3..d8e5297 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -266,6 +266,32 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     }
   }
 
+  test("SPARK-27858 Union type: More than one non-null type") {
+    withTempDir { dir =>
+      val complexNullUnionType = Schema.createUnion(
+        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
+      val fields = Seq(
+        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
+      val schema = Schema.createRecord("name", "docs", "namespace", false)
+      schema.setFields(fields)
+      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(schema, new File(s"$dir.avro"))
+      val avroRec = new GenericData.Record(schema)
+      avroRec.put("field1", 42)
+      dataFileWriter.append(avroRec)
+      val avroRec2 = new GenericData.Record(schema)
+      avroRec2.put("field1", "Alice")
+      dataFileWriter.append(avroRec2)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+
+      val df = spark.read.format("avro").load(s"$dir.avro")
+      assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
+      assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
+    }
+  }
+
   test("Complex Union Type") {
     withTempPath { dir =>
       val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org