You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/11 07:15:33 UTC

[GitHub] [spark] steven-aerts opened a new pull request, #36506: [SPARK-25050][SQL] Avro: writing complex unions

steven-aerts opened a new pull request, #36506:
URL: https://github.com/apache/spark/pull/36506

   Add the capability to write complex unions, next to reading them.
   Complex unions map to struct types where field names are member0, member1, etc.
   This is consistent with the behavior in SchemaConverters for reading them
   and when converting between Avro and Parquet.
   
   
   ### What changes were proposed in this pull request?
   Spark was able to read complex unions already but not write them. 
   Now it is possible to also write them.  If you have a schema with a complex union the following code is now working:
   
   ```scala
   spark
     .read.format("avro").option("avroSchema", avroSchema).load(path)
     .write.format("avro").option("avroSchema", avroSchema).save("/tmp/b")
   ```
   While before this patch it would throw `Unsupported Avro UNION type` when writing.
   
   ### Why are the changes needed?
   Fixes SPARK-25050, lines up read and write compatibility.
   
   
   ### Does this PR introduce _any_ user-facing change?
   The behaviour improved of course, this is as far as I could see not impacting any customer facing API's or documentation.
   
   
   ### How was this patch tested?
   - Added extra unit tests.
   - Updated existing unit tests for improved behaviour.
   - Validated manually with an internal corpus of avro files if they now could be read and written without problems.  Which was not before this patch.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] robreeves commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
robreeves commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r1004940613


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -312,14 +368,11 @@ private[sql] class AvroSerializer(
    */
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
-      val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      nonNullUnionBranches(avroType) match {

Review Comment:
   This logic has a couple of issues determining nullability for a union. Checking if the union contains a null is needed to handle these cases.
   
   1. Case `["null", "int", "long"]` will hit line 374 and return `nullable=false` when it should return true.
   2. Case `["int"]` will hit line 373 and return `nullable=true` when it should return false.
   
   Maybe something like this?
   
   ```scala
   val containsNull = avroType.getTypes.asScala.exists(_.getType == Schema.Type.NULL)
   
   nonNullUnionBranches(avroType) match {
       case Seq() => (true, Schema.create(Type.NULL))
       case Seq(singleType) => (containsNull, singleType)
       case _ => (containsNull, avroType)
   }
   ```



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,59 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      unionType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = nonNullUnionBranches(unionType)
+    val expectedFieldNames = nonNullTypes.indices.map(i => s"member$i")
+    val catalystFieldNames = catalystStruct.fieldNames.toSeq
+    if (positionalFieldMatch) {
+      if (expectedFieldNames.length != catalystFieldNames.length) {
+        throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+          s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+          s"${expectedFieldNames.length} members but got ${catalystFieldNames.length}")
+      }
+    } else {
+      if (catalystFieldNames != expectedFieldNames) {
+        throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+          s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+          s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+          s"${catalystFieldNames.mkString("(", ", ", ")")}")
+      }
+    }
+
+    val unionBranchConverters = nonNullTypes.zip(catalystStruct).map { case (unionBranch, cf) =>
+      newConverter(cf.dataType, unionBranch, catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numBranches = catalystStruct.length
+    row: InternalRow => {
+      var idx = 0
+      var retVal: Any = null
+      while (idx < numBranches && retVal == null) {
+        if (!row.isNullAt(idx)) {
+          retVal = unionBranchConverters(idx).apply(row, idx)
+        }
+        idx += 1
+      }
+      retVal
+    }
+  }
+
   /**
    * Resolve a possibly nullable Avro Type.
    *
    * An Avro type is nullable when it is a [[UNION]] of two types: one null type and another

Review Comment:
   This comment still says that only a `["null", "type"]` union is supported. Can you update it so it is clear all unions are supported now?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -312,14 +368,11 @@ private[sql] class AvroSerializer(
    */
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
-      val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      nonNullUnionBranches(avroType) match {
+        case Seq() => (true, Schema.create(Type.NULL))
+        case Seq(singleType) => (true, singleType)
+        case _ => (false, avroType)

Review Comment:
   `avroTypes` contains the null type in the union. Should this return the union types with the null filtered out? That is what it previously did for nullable unions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r961600558


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   Hi @xkrogen,
   
   I understand now better what you want to say and I agree we lose some information here.
   I do not see how we can this resolved without changing the way spark reads in its avro structure.  As it is there we are losing this.
   Changing that will lead to more problems than we introduct here I think.  Do you agree?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numFields = catalystStruct.length
+    row: InternalRow =>
+      (0 until numFields).dropWhile(row.isNullAt).headOption match {
+        case Some(i) => fieldConverters(i).apply(row, i)
+        case None => null
+      }
+  }
+
+  def validateComplexUnionMembers(
+      catalystStruct: StructType,
+      unionTypes: Seq[Schema],
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): Unit = {
+    val expectedFieldNames = unionTypes.indices.map(i => s"member$i")
+    if (catalystStruct.fieldNames.toSeq != expectedFieldNames) {
+      throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+        s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+        s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+        s"${catalystStruct.fieldNames.mkString("(", ", ", ")")}")
+    }
+  }

Review Comment:
   Done



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)

Review Comment:
   I opt here to show the path according to the catalyst schema.  And use `memberN` here instead to show which member we take or have problems converting.
   
   I have the feeling this is more consistent and user friendly than printing the path according to the avro schema.
   But if you feel different I do not mind to adapt it.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -219,7 +219,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro with complex union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
     for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {

Review Comment:
   `["int", "string"]` will not work, as this will not be coerced in one single type like `int` and `long` as it will be represented as a struct with `memberN`.
   Maybe I misunderstood your question?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -327,11 +329,19 @@ abstract class AvroSuite
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(nativeWriterPath)
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
       assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+
+      df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath)
+
+      val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath)
+      assertResult(field1)(df2.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df2.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df2.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df2.selectExpr("field4.member3").first().get(0))

Review Comment:
   Done



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>

Review Comment:
   branch is the name used in the avro spec.  So changed it to `unionBranch`.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -313,13 +364,12 @@ private[sql] class AvroSerializer(
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
       val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      val nonNullTypes = fields.filter(_.getType != Type.NULL)
+      if (nonNullTypes.length == 1) {
+        (true, nonNullTypes.head)
+      } else {
+        (false, avroType)
       }

Review Comment:
   Added code +test to handle single null case.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)

Review Comment:
   This does not work, as the data of the rows need to match the catalyst schema.  So it needs to be a long and a double, the null however can be tested.  Which I did.  Is that ok for you?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)

Review Comment:
   Done



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)
 
       withTempPath { tempDir =>
-        val message = intercept[SparkException] {
+        df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath)
+        checkAnswer(
+          spark.read
+            .format("avro")
+            .option("avroSchema", avroSchema)
+            .load(tempDir.getPath),
+          df)
+      }
+    }
+  }
+
+  test("non-matching complex union types") {
+    val catalystSchema =
+      StructType(Seq(
+        StructField("Union", StructType(Seq(
+          StructField("member0", IntegerType),
+          StructField("member1", StructType(Seq(StructField("f1", StringType, nullable = false))))
+        )))))
+
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(Row(1, null)))), catalystSchema)
+
+    val recordSchema = """{"type":"record","name":"r","fields":[{"name":"f1","type":"string"}]}"""
+    for ((unionSchema, compatible) <- Seq(
+      (""""null","int",""" + recordSchema, true),
+      (""""int","null",""" + recordSchema, true),
+      (""""int",""" + recordSchema + ""","null"""", true),
+      (""""int",""" + recordSchema, true),
+      (""""null",""" + recordSchema + ""","int"""", false),
+      (""""null",""" + recordSchema, false),
+      (""""null","int",{"type":"record","name":"r","fields":[{"name":"f2","type":"string"}]}""",
+        false)
+    )) {
+      val avroSchema = s"""
+                          |{
+                          |  "type" : "record",
+                          |  "name" : "test_schema",
+                          |  "fields" : [
+                          |    {"name": "Union", "type": [$unionSchema]}
+                          |  ]
+                          |}
+      """.stripMargin

Review Comment:
   A lot better indeed



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -337,4 +387,8 @@ private[sql] class AvroSerializer(
         "schema will throw runtime exception if there is a record with null value.")
     }
   }
+
+  private def nonNullUnionTypes(avroType: Schema): Set[Type] = {
+    avroType.getTypes.asScala.map(_.getType).filter(_ != NULL).toSet
+  }

Review Comment:
   Done



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)
 
       withTempPath { tempDir =>
-        val message = intercept[SparkException] {
+        df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath)
+        checkAnswer(
+          spark.read
+            .format("avro")
+            .option("avroSchema", avroSchema)
+            .load(tempDir.getPath),
+          df)
+      }
+    }
+  }
+
+  test("non-matching complex union types") {
+    val catalystSchema =
+      StructType(Seq(
+        StructField("Union", StructType(Seq(
+          StructField("member0", IntegerType),
+          StructField("member1", StructType(Seq(StructField("f1", StringType, nullable = false))))
+        )))))

Review Comment:
   Perfect



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numFields = catalystStruct.length
+    row: InternalRow =>
+      (0 until numFields).dropWhile(row.isNullAt).headOption match {
+        case Some(i) => fieldConverters(i).apply(row, i)
+        case None => null
+      }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r965125762


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -220,26 +220,36 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro optional union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (supportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
       val avroTypeStruct = s"""
         |{
         |  "type": "record",
         |  "name": "struct",
         |  "fields": [
-        |    {"name": "id", "type": $unsupportedAvroType},
+        |    {"name": "id", "type": $supportedAvroType},
         |    {"name": "str", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
-      val message = intercept[SparkException] {
-        df.select(functions.to_avro($"struct", avroTypeStruct).as("avro")).show()
-      }.getCause.getMessage
-      assert(message.contains("Only UNION of a null type and a non-null type is supported"))
+      val avroStructDF = df.select(functions.to_avro($"struct", avroTypeStruct).as("avro"))
+      checkAnswer(avroStructDF.select(
+        functions.from_avro($"avro", avroTypeStruct)), df)
     }
   }
 
+  test("to_avro complex union Avro schema") {
+    val df = Seq(Tuple2(Some(1), None), Tuple2(None, Some("a"))).toDF()

Review Comment:
   Why the explicit reference to `Tuple2`? You should be able to just do `(Some(1), None)` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1124312137

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] thejdeep commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
thejdeep commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1212279813

   @steven-aerts Thanks for working on this feature.
   
   +1 to this PR. The lack of complex union type write support causes us problems too. Right now, since the standard Dataframe/Dataset APIs do not support writing out unions with multiple subtypes, we have been deferring to changing the underlying schema which maybe cumbersome in some cases or having to use the [saveAsNewAPIHadoopFile](https://spark.apache.org/docs/3.0.0/api/scala/org/apache/spark/rdd/PairRDDFunctions.html#saveAsNewAPIHadoopFile(path:String,keyClass:Class%5B_%5D,valueClass:Class%5B_%5D,outputFormatClass:Class%5B_%3C:org.apache.hadoop.mapreduce.OutputFormat%5B_,_%5D%5D,conf:org.apache.hadoop.conf.Configuration):Unit) RDD API which skips the Catalyst path. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1212452348

   +CC @dongjoon-hyun, @HyukjinKwon who might be able to review this better than me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1385550180

   @gengliangwang any comments on the latest diff, after @steven-aerts answered your last question? Seems that this PR is in a very healthy state, I would love to see it merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1442450225

   Merging to master/3.4. cc Spark 3.4.0 release manager @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r956131044


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   I think is not lost, as it as the schema has to be provided when writing avro.  If the avro schema defines a union it will be written as a union, and this info is not really lost.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r964436934


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),

Review Comment:
   Done



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,51 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      unionType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = nonNullUnionBranches(unionType)
+    val expectedFieldNames = nonNullTypes.indices.map(i => s"member$i")
+    if (catalystStruct.fieldNames.toSeq != expectedFieldNames) {
+      throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+        s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+        s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+        s"${catalystStruct.fieldNames.mkString("(", ", ", ")")}")
+    }

Review Comment:
   Done



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -323,15 +329,30 @@ abstract class AvroSuite
       avroRec.put("field2", field2)
       avroRec.put("field3", new Fixed(fixedSchema, field3))
       avroRec.put("field4", new EnumSymbol(enumSchema, field4))
+      avroRec.put("field5", null)
       dataFileWriter.append(avroRec)
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(nativeWriterPath)
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
       assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+
+      df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath)
+
+      val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath)
+      assertResult(field1)(df2.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df2.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df2.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df2.selectExpr("field4.member3").first().get(0))

Review Comment:
   `member4` does not exist here, as no members are generated for the null type. 
   Updated test to cover full rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r948268086


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   This does imply some loss of information if you were to do a round-trip from Avro to SQL to Avro, since all records written out would have double/long values even if the input was a float/int.
   
   The representation chosen on the read path is inherently lossy, since we discard the information about which union branch the datum originated from, so I don't think there's anything we can do here to avoid this behavior. Just wanted to point it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r960091825


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   > as the schema has to be provided when writing avro
   
   It's not required to provide the schema, right? If you don't provide one, then we'll use `SchemaConverters.toAvroType()` to derive a schema used at write-time. But that's not what I'm worried about; that part will always be lossy since there are things (like unions) that `toAvroType()` will not generate.
   
   Even if you do provide the same schema used at the input, the values themselves will all be long/double regardless of their input type.
   
   Consider a record with a field schema like:
   ```
   { "name": "foo", "type": ["int", "long"] }
   ```
   We have some input dataset that contains two records, one with the value `1` (int) and one with the value `2` (long). If we read this via Spark, then write it back _using the same schema_:
   ```
   val avroSchema = ...
   val df = spark.read.format("avro").option("avroSchema", avroSchema).load("/path/to/input.avro")
   df.write.format("avro").option("avroSchema", avroSchema).save("/path/to/output.avro")
   ```
   Now `output.avro` contains two records, with value 1 and 2, except that both are stored as long (the second union branch). This is different from `input.avro`, which has one int (first union branch) and one long (second union branch).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r964478195


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -219,7 +219,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro with complex union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
     for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r1040203310


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)

Review Comment:
   This seems different from https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L296
   If only one non-null field in the union, we will create a converter on the non-null field instead of a complex one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r1056755944


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)

Review Comment:
   @gengliangwang this branch will only be hit if the resolved avro type is determined to be of the type union.
   
   For optional fields, so a union of a type with `null`.  The [resolution ](https://github.com/apache/spark/pull/36506/files/1d950f55e85f11bfb85d9d57324d7ef7916b4a54#diff-e84c7dfce11df94178029c05a8427668c9dab4bb809ed6e362a39ff043a59f89R369) already deterimined that it was not a union type and will hit the branches another branch in this case.  It is also why `newConverter` is almost always called from a type generated by [`resolveNullableType`](https://github.com/apache/spark/pull/36506/files/1d950f55e85f11bfb85d9d57324d7ef7916b4a54#diff-e84c7dfce11df94178029c05a8427668c9dab4bb809ed6e362a39ff043a59f89R358).
   
   Let me know if this is not clear.
   Btw, I missed you question somehow, sorry for the late reply.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r964059224


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)

Review Comment:
   Ah yes I see. Makes sense, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1442461496

   Thank you all! I'm also supporting @gengliangwang 's backporting decision.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang closed pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions
URL: https://github.com/apache/spark/pull/36506


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1224354746

   @steven-aerts just want to check if you are still working on this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r1005268192


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -312,14 +368,11 @@ private[sql] class AvroSerializer(
    */
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
-      val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      nonNullUnionBranches(avroType) match {
+        case Seq() => (true, Schema.create(Type.NULL))
+        case Seq(singleType) => (true, singleType)
+        case _ => (false, avroType)

Review Comment:
   So we only make an exception for _optional types_.  For _complex unions_, we however need the null further down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r1005253894


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -312,14 +368,11 @@ private[sql] class AvroSerializer(
    */
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
-      val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      nonNullUnionBranches(avroType) match {
+        case Seq() => (true, Schema.create(Type.NULL))
+        case Seq(singleType) => (true, singleType)
+        case _ => (false, avroType)

Review Comment:
   I think this is exactly what we are doing here, by using the output of `nonNullUnionBranches`.
   We make however an exception of `["null"]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1334894271

   Is there still something I can/have to do to get this patch submitted?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by "xkrogen (via GitHub)" <gi...@apache.org>.
xkrogen commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1439152114

   Ping @cloud-fan @gengliangwang @dongjoon-hyun , are any of you available to help review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1439214622

   @steven-aerts @xkrogen Sorry for the late reply. I will take another look later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r948281127


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -313,13 +364,12 @@ private[sql] class AvroSerializer(
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
       val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      val nonNullTypes = fields.filter(_.getType != Type.NULL)
+      if (nonNullTypes.length == 1) {
+        (true, nonNullTypes.head)
+      } else {
+        (false, avroType)
       }

Review Comment:
   We could unpack this using a match statement to be a little more Scala-idiomatic:
   ```suggestion
         avroType.getTypes.asScala.filter(_.getType != Type.NULL).toSeq match {
           case Seq() => throw new UnsupportedAvroTypeException(
             """seems this situation wasn't handled, but `type: ["null"]` is a valid avro schema""")
           case Seq(singleType) => (true, singleType)
           case _ => (false, avroType)
         }
   ```
   Either way works, though I will note that the current code doesn't properly handle the situation where there is a union with a single type that is `NULL`. Either style we use, we should make sure we throw an error for that case.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)

Review Comment:
   Avro union branches are identified by their type, so it seems like technically we should use `avroPath :+ avroField.getFullName` here instead of `avroPath :+ cf.name`. The full name could have periods in it so we should probably wrap it like `avroPath :+ s"[${avroField.getFullName}]"`



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -327,11 +329,19 @@ abstract class AvroSuite
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(nativeWriterPath)
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
       assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+
+      df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath)
+
+      val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath)
+      assertResult(field1)(df2.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df2.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df2.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df2.selectExpr("field4.member3").first().get(0))

Review Comment:
   This is good, but we only validate that Spark is able to re-read the written output. So for example if we had implemented `AvroSerializer` to actually write out a record type with fields `memberN`, instead of writing out a union type, this test would still pass.
   
   It would be better to _also_ validate the output using native Avro reader, and check that we actually wrote a valid union type.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   This does imply some loss of information if you were to do a round-trip from Avro to SQL to Avro, since all records written out would have double/long values even if the input was a float/int.
   
   The representation chosen on the read path is inherently lossy, since we discard the information about which union branch the datum originated from, so I don't think there's anything we can do here to avoid this behavior. Just wanted to point it out.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numFields = catalystStruct.length
+    row: InternalRow =>
+      (0 until numFields).dropWhile(row.isNullAt).headOption match {
+        case Some(i) => fieldConverters(i).apply(row, i)
+        case None => null
+      }
+  }
+
+  def validateComplexUnionMembers(
+      catalystStruct: StructType,
+      unionTypes: Seq[Schema],
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): Unit = {
+    val expectedFieldNames = unionTypes.indices.map(i => s"member$i")
+    if (catalystStruct.fieldNames.toSeq != expectedFieldNames) {
+      throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+        s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+        s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+        s"${catalystStruct.fieldNames.mkString("(", ", ", ")")}")
+    }
+  }

Review Comment:
   Can we just inline this into `newComplexUnionConverter()`? The method definition/parameter list is as long as the body :)
   
   If we do need to keep it, it should be made private.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),

Review Comment:
   Avro doesn't allow nested unions so when we're iterating over the union branches here, we'll never see a union. Thus there is no point in `resolveNullableType()`, since all it does is extract nullability from unions. We can just pass `avroField` directly here.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numFields = catalystStruct.length
+    row: InternalRow =>
+      (0 until numFields).dropWhile(row.isNullAt).headOption match {
+        case Some(i) => fieldConverters(i).apply(row, i)
+        case None => null
+      }

Review Comment:
   Since this is a performance-critical section (executed on a per-row basis), it's better to avoid Scala collections, which can be much less performant due to creation of lots of temporary intermediate objects. We can instead use a while-loop (note that `return` and `break` also have bad performance in Scala since they are implemented using exceptions, so we avoid it by using a control variable):
   ```suggestion
       row: InternalRow => {
         var idx = 0
         var retVal: Any = null
         while (idx < numFields && retVal == null) {
           if (!row.isNullAt(idx)) {
             retVal = fieldConverters(idx).apply(row, idx)
           }
           idx += 1
         }
       }
   ```



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -337,4 +387,8 @@ private[sql] class AvroSerializer(
         "schema will throw runtime exception if there is a record with null value.")
     }
   }
+
+  private def nonNullUnionTypes(avroType: Schema): Set[Type] = {
+    avroType.getTypes.asScala.map(_.getType).filter(_ != NULL).toSet
+  }

Review Comment:
   It's about time we had a utility method for this, given how many places we do it here and in `AvroDeserializer`, but I would suggest that we:
   1. Make this return `Set[Schema]` (i.e. remove the `map(_.getType)`) or even `Seq[Schema]` to make it more generally useful.
   2. Move it into `AvroUtils` to make it more accessible
   
   For the newly added union check the usage becomes slightly less concise:
   ```
         case (DoubleType, UNION) if nonNullUnionTypes(avroType).map(_.getType) == Set(FLOAT, DOUBLE) =>
           ...
         case (LongType, UNION) if nonNullUnionTypes(avroType).map(_.getType) == Set(INT, LONG) =>
           ...
   ```
   But now we can use it in a bunch of other places: `AvroSerializer#resolveAvroType()`, `AvroSerializer#newComplexUnionConverter()`, `SchemaConverters#toSqlTypeHelper()`, and `AvroDeserializer#newWriter()`
   



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -219,7 +219,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro with complex union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
     for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {

Review Comment:
   This is only checking the `(INT, LONG)` case. Can we also check the more generic case of different types, like `(INT, STRING)`?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)

Review Comment:
   Can we also test with a record that has the non-upcasted types (int/float)?
   ```suggestion
           spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"), Row(1, 0.9F, null))), catalystSchema)
   ```



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>

Review Comment:
   `avroField` seems like a bit of a misleading name, since this is a `Schema` rather than a `Schema.Field`, and it's not actually a field at all -- it's a union branch. Maybe `avroBranch` or `avroBranchType`? 



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)

Review Comment:
   `zip` instead of `zipWithIndex` ?
   ```suggestion
       val fieldConverters = nonNullTypes.zip(catalystStruct).map { case (avroField, cf) =>
   ```



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -327,11 +329,19 @@ abstract class AvroSuite
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(nativeWriterPath)
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
       assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+
+      df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath)
+
+      val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath)
+      assertResult(field1)(df2.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df2.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df2.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df2.selectExpr("field4.member3").first().get(0))

Review Comment:
   One other thing which is not tested currently is a top-level null for the union. I've seen this get handled improperly in some areas in the past; it would be nice to see it covered by the testing here since it is a special case. 



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)
 
       withTempPath { tempDir =>
-        val message = intercept[SparkException] {
+        df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath)
+        checkAnswer(
+          spark.read
+            .format("avro")
+            .option("avroSchema", avroSchema)
+            .load(tempDir.getPath),
+          df)
+      }
+    }
+  }
+
+  test("non-matching complex union types") {
+    val catalystSchema =
+      StructType(Seq(
+        StructField("Union", StructType(Seq(
+          StructField("member0", IntegerType),
+          StructField("member1", StructType(Seq(StructField("f1", StringType, nullable = false))))
+        )))))

Review Comment:
   You can use the builder-style `new StructType().add(...)` to be more concise here:
   ```suggestion
       val catalystSchema = new StructType().add("Union", new StructType()
         .add("member0", IntegerType)
         .add("member1", new StructType().add("f1", StringType, nullable = false))
   ```
   (I prefer this format since it's less verbose, but either way is okay)



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)
 
       withTempPath { tempDir =>
-        val message = intercept[SparkException] {
+        df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath)
+        checkAnswer(
+          spark.read
+            .format("avro")
+            .option("avroSchema", avroSchema)
+            .load(tempDir.getPath),
+          df)
+      }
+    }
+  }
+
+  test("non-matching complex union types") {
+    val catalystSchema =
+      StructType(Seq(
+        StructField("Union", StructType(Seq(
+          StructField("member0", IntegerType),
+          StructField("member1", StructType(Seq(StructField("f1", StringType, nullable = false))))
+        )))))
+
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(Row(1, null)))), catalystSchema)
+
+    val recordSchema = """{"type":"record","name":"r","fields":[{"name":"f1","type":"string"}]}"""
+    for ((unionSchema, compatible) <- Seq(
+      (""""null","int",""" + recordSchema, true),
+      (""""int","null",""" + recordSchema, true),
+      (""""int",""" + recordSchema + ""","null"""", true),
+      (""""int",""" + recordSchema, true),
+      (""""null",""" + recordSchema + ""","int"""", false),
+      (""""null",""" + recordSchema, false),
+      (""""null","int",{"type":"record","name":"r","fields":[{"name":"f2","type":"string"}]}""",
+        false)
+    )) {
+      val avroSchema = s"""
+                          |{
+                          |  "type" : "record",
+                          |  "name" : "test_schema",
+                          |  "fields" : [
+                          |    {"name": "Union", "type": [$unionSchema]}
+                          |  ]
+                          |}
+      """.stripMargin

Review Comment:
   Maybe use `SchemaBuilder` here? It was hard for me to read the unionSchema examples with so many quotes
   ```suggestion
       val recordS = SchemaBuilder.record("r").fields().requiredString("f1").endRecord()
       val intS = Schema.create(Schema.Type.INT)
       val nullS = Schema.create(Schema.Type.NULL)
       for ((unionTypes, compatible) <- Seq(
         (Seq(nullS, intS, recordS), true),
         (Seq(intS, nullS, recordS), true),
         (Seq(intS, recordS, nullS), true),
         (Seq(intS, recordS), true),
         (Seq(nullS, recordS, intS), false),
         (Seq(nullS, recordS), false),
         (Seq(nullS, SchemaBuilder.record("r").fields().requiredString("f2").endRecord()), false)
       )) {
         val avroSchema = SchemaBuilder.record("test_schema").fields()
           .name("union").`type`(Schema.createUnion(unionTypes: _*)).noDefault()
           .endRecord().toString()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r964068530


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,51 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      unionType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = nonNullUnionBranches(unionType)
+    val expectedFieldNames = nonNullTypes.indices.map(i => s"member$i")
+    if (catalystStruct.fieldNames.toSeq != expectedFieldNames) {
+      throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+        s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+        s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+        s"${catalystStruct.fieldNames.mkString("(", ", ", ")")}")
+    }

Review Comment:
   We should respect `AvroOptions.positionalFieldMatching` here and only do this check if `positionalFieldMatching = false`, WDYT?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -323,15 +329,30 @@ abstract class AvroSuite
       avroRec.put("field2", field2)
       avroRec.put("field3", new Fixed(fixedSchema, field3))
       avroRec.put("field4", new EnumSymbol(enumSchema, field4))
+      avroRec.put("field5", null)
       dataFileWriter.append(avroRec)
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(nativeWriterPath)
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
       assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+
+      df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath)
+
+      val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath)
+      assertResult(field1)(df2.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df2.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df2.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df2.selectExpr("field4.member3").first().get(0))

Review Comment:
   Can we confirm the value of `df2.selectExpr("field5")` as well (here and above)?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -219,7 +219,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro with complex union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
     for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {

Review Comment:
   Yeah I agree the test as-is won't work. I was thinking more like adding another test for the more complex case. My point was more that the int-long/float-double case is only ... "partially complex"? And this test claims to be testing the complex case, which (to me at least) would indicate that we get an exploded record. 



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)

Review Comment:
   Yeah that's fair. Slightly less precise, but I agree it will probably be more in-line with user expectations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r965579045


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -220,26 +220,36 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro optional union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (supportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
       val avroTypeStruct = s"""
         |{
         |  "type": "record",
         |  "name": "struct",
         |  "fields": [
-        |    {"name": "id", "type": $unsupportedAvroType},
+        |    {"name": "id", "type": $supportedAvroType},
         |    {"name": "str", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
-      val message = intercept[SparkException] {
-        df.select(functions.to_avro($"struct", avroTypeStruct).as("avro")).show()
-      }.getCause.getMessage
-      assert(message.contains("Only UNION of a null type and a non-null type is supported"))
+      val avroStructDF = df.select(functions.to_avro($"struct", avroTypeStruct).as("avro"))
+      checkAnswer(avroStructDF.select(
+        functions.from_avro($"avro", avroTypeStruct)), df)
     }
   }
 
+  test("to_avro complex union Avro schema") {
+    val df = Seq(Tuple2(Some(1), None), Tuple2(None, Some("a"))).toDF()

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r964058318


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   Yes, I agree. No need to fix here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by GitBox <gi...@apache.org>.
xkrogen commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1335608958

   Unfortunately none of I / @thejdeep / @robreeves are committers, just interested parties, so we can't merge. We need a review from a committer as well.
   
   @gengliangwang , @cloud-fan , @dongjoon-hyun , can any of you take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #36506:
URL: https://github.com/apache/spark/pull/36506#issuecomment-1439482259

   cc @bozhang2820 since you made https://github.com/apache/spark/commit/551b504cfe38d1ab583e617c37e49659edd65c2e


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org