You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/25 20:43:01 UTC

[GitHub] [spark] robreeves commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

robreeves commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r1004940613


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -312,14 +368,11 @@ private[sql] class AvroSerializer(
    */
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
-      val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      nonNullUnionBranches(avroType) match {

Review Comment:
   This logic has a couple of issues determining nullability for a union. Checking if the union contains a null is needed to handle these cases.
   
   1. Case `["null", "int", "long"]` will hit line 374 and return `nullable=false` when it should return true.
   2. Case `["int"]` will hit line 373 and return `nullable=true` when it should return false.
   
   Maybe something like this?
   
   ```scala
   val containsNull = avroType.getTypes.asScala.exists(_.getType == Schema.Type.NULL)
   
   nonNullUnionBranches(avroType) match {
       case Seq() => (true, Schema.create(Type.NULL))
       case Seq(singleType) => (containsNull, singleType)
       case _ => (containsNull, avroType)
   }
   ```



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,59 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      unionType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = nonNullUnionBranches(unionType)
+    val expectedFieldNames = nonNullTypes.indices.map(i => s"member$i")
+    val catalystFieldNames = catalystStruct.fieldNames.toSeq
+    if (positionalFieldMatch) {
+      if (expectedFieldNames.length != catalystFieldNames.length) {
+        throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+          s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+          s"${expectedFieldNames.length} members but got ${catalystFieldNames.length}")
+      }
+    } else {
+      if (catalystFieldNames != expectedFieldNames) {
+        throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+          s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+          s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+          s"${catalystFieldNames.mkString("(", ", ", ")")}")
+      }
+    }
+
+    val unionBranchConverters = nonNullTypes.zip(catalystStruct).map { case (unionBranch, cf) =>
+      newConverter(cf.dataType, unionBranch, catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numBranches = catalystStruct.length
+    row: InternalRow => {
+      var idx = 0
+      var retVal: Any = null
+      while (idx < numBranches && retVal == null) {
+        if (!row.isNullAt(idx)) {
+          retVal = unionBranchConverters(idx).apply(row, idx)
+        }
+        idx += 1
+      }
+      retVal
+    }
+  }
+
   /**
    * Resolve a possibly nullable Avro Type.
    *
    * An Avro type is nullable when it is a [[UNION]] of two types: one null type and another

Review Comment:
   This comment still says that only a `["null", "type"]` union is supported. Can you update it so it is clear all unions are supported now?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -312,14 +368,11 @@ private[sql] class AvroSerializer(
    */
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
-      val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      nonNullUnionBranches(avroType) match {
+        case Seq() => (true, Schema.create(Type.NULL))
+        case Seq(singleType) => (true, singleType)
+        case _ => (false, avroType)

Review Comment:
   `avroTypes` contains the null type in the union. Should this return the union types with the null filtered out? That is what it previously did for nullable unions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org