You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/02 15:02:32 UTC

[GitHub] [spark] steven-aerts commented on a diff in pull request #36506: [SPARK-25050][SQL] Avro: writing complex unions

steven-aerts commented on code in PR #36506:
URL: https://github.com/apache/spark/pull/36506#discussion_r961600558


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -218,6 +218,17 @@ private[sql] class AvroSerializer(
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConvertor = newComplexUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConvertor(getter.getStruct(ordinal, numFields))
+
+      case (DoubleType, UNION) if nonNullUnionTypes(avroType) == Set(FLOAT, DOUBLE) =>
+        (getter, ordinal) => getter.getDouble(ordinal)
+
+      case (LongType, UNION) if nonNullUnionTypes(avroType) == Set(INT, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)

Review Comment:
   Hi @xkrogen,
   
   I understand now better what you want to say and I agree we lose some information here.
   I do not see how we can this resolved without changing the way spark reads in its avro structure.  As it is there we are losing this.
   Changing that will lead to more problems than we introduct here I think.  Do you agree?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numFields = catalystStruct.length
+    row: InternalRow =>
+      (0 until numFields).dropWhile(row.isNullAt).headOption match {
+        case Some(i) => fieldConverters(i).apply(row, i)
+        case None => null
+      }
+  }
+
+  def validateComplexUnionMembers(
+      catalystStruct: StructType,
+      unionTypes: Seq[Schema],
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): Unit = {
+    val expectedFieldNames = unionTypes.indices.map(i => s"member$i")
+    if (catalystStruct.fieldNames.toSeq != expectedFieldNames) {
+      throw new IncompatibleSchemaException(s"Generic Avro union at ${toFieldStr(avroPath)} " +
+        s"does not match the SQL schema at ${toFieldStr(catalystPath)}.  It expected the " +
+        s"following members ${expectedFieldNames.mkString("(", ", ", ")")} but got " +
+        s"${catalystStruct.fieldNames.mkString("(", ", ", ")")}")
+    }
+  }

Review Comment:
   Done



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)

Review Comment:
   I opt here to show the path according to the catalyst schema.  And use `memberN` here instead to show which member we take or have problems converting.
   
   I have the feeling this is more consistent and user friendly than printing the path according to the avro schema.
   But if you feel different I do not mind to adapt it.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -219,7 +219,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       functions.from_avro($"avro", avroTypeStruct)), df)
   }
 
-  test("to_avro with unsupported nullable Avro schema") {
+  test("to_avro with complex union Avro schema") {
     val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
     for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {

Review Comment:
   `["int", "string"]` will not work, as this will not be coerced in one single type like `int` and `long` as it will be represented as a struct with `memberN`.
   Maybe I misunderstood your question?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -327,11 +329,19 @@ abstract class AvroSuite
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(nativeWriterPath)
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
       assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+
+      df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath)
+
+      val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath)
+      assertResult(field1)(df2.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df2.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df2.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df2.selectExpr("field4.member3").first().get(0))

Review Comment:
   Done



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>

Review Comment:
   branch is the name used in the avro spec.  So changed it to `unionBranch`.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -313,13 +364,12 @@ private[sql] class AvroSerializer(
   private def resolveAvroType(avroType: Schema): (Boolean, Schema) = {
     if (avroType.getType == Type.UNION) {
       val fields = avroType.getTypes.asScala
-      val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      val nonNullTypes = fields.filter(_.getType != Type.NULL)
+      if (nonNullTypes.length == 1) {
+        (true, nonNullTypes.head)
+      } else {
+        (false, avroType)
       }

Review Comment:
   Added code +test to handle single null case.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)

Review Comment:
   This does not work, as the data of the rows need to match the catalyst schema.  So it needs to be a long and a double, the null however can be tested.  Which I did.  Is that ok for you?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)

Review Comment:
   Done



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)
 
       withTempPath { tempDir =>
-        val message = intercept[SparkException] {
+        df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath)
+        checkAnswer(
+          spark.read
+            .format("avro")
+            .option("avroSchema", avroSchema)
+            .load(tempDir.getPath),
+          df)
+      }
+    }
+  }
+
+  test("non-matching complex union types") {
+    val catalystSchema =
+      StructType(Seq(
+        StructField("Union", StructType(Seq(
+          StructField("member0", IntegerType),
+          StructField("member1", StructType(Seq(StructField("f1", StringType, nullable = false))))
+        )))))
+
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(Row(1, null)))), catalystSchema)
+
+    val recordSchema = """{"type":"record","name":"r","fields":[{"name":"f1","type":"string"}]}"""
+    for ((unionSchema, compatible) <- Seq(
+      (""""null","int",""" + recordSchema, true),
+      (""""int","null",""" + recordSchema, true),
+      (""""int",""" + recordSchema + ""","null"""", true),
+      (""""int",""" + recordSchema, true),
+      (""""null",""" + recordSchema + ""","int"""", false),
+      (""""null",""" + recordSchema, false),
+      (""""null","int",{"type":"record","name":"r","fields":[{"name":"f2","type":"string"}]}""",
+        false)
+    )) {
+      val avroSchema = s"""
+                          |{
+                          |  "type" : "record",
+                          |  "name" : "test_schema",
+                          |  "fields" : [
+                          |    {"name": "Union", "type": [$unionSchema]}
+                          |  ]
+                          |}
+      """.stripMargin

Review Comment:
   A lot better indeed



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -337,4 +387,8 @@ private[sql] class AvroSerializer(
         "schema will throw runtime exception if there is a record with null value.")
     }
   }
+
+  private def nonNullUnionTypes(avroType: Schema): Set[Type] = {
+    avroType.getTypes.asScala.map(_.getType).filter(_ != NULL).toSet
+  }

Review Comment:
   Done



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -1144,32 +1154,87 @@ abstract class AvroSuite
     }
   }
 
-  test("unsupported nullable avro type") {
+  test("int/long double/float conversion") {
     val catalystSchema =
       StructType(Seq(
-        StructField("Age", IntegerType, nullable = false),
-        StructField("Name", StringType, nullable = false)))
+        StructField("Age", LongType),
+        StructField("Length", DoubleType),
+        StructField("Name", StringType)))
 
-    for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
+    for (optionalNull <- Seq(""""null",""", "")) {
       val avroSchema = s"""
         |{
         |  "type" : "record",
         |  "name" : "test_schema",
         |  "fields" : [
-        |    {"name": "Age", "type": $unsupportedAvroType},
+        |    {"name": "Age", "type": [$optionalNull "int", "long"]},
+        |    {"name": "Length", "type": [$optionalNull "float", "double"]},
         |    {"name": "Name", "type": ["null", "string"]}
         |  ]
         |}
       """.stripMargin
 
       val df = spark.createDataFrame(
-        spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))), catalystSchema)
+        spark.sparkContext.parallelize(Seq(Row(2L, 1.8D, "Aurora"))), catalystSchema)
 
       withTempPath { tempDir =>
-        val message = intercept[SparkException] {
+        df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath)
+        checkAnswer(
+          spark.read
+            .format("avro")
+            .option("avroSchema", avroSchema)
+            .load(tempDir.getPath),
+          df)
+      }
+    }
+  }
+
+  test("non-matching complex union types") {
+    val catalystSchema =
+      StructType(Seq(
+        StructField("Union", StructType(Seq(
+          StructField("member0", IntegerType),
+          StructField("member1", StructType(Seq(StructField("f1", StringType, nullable = false))))
+        )))))

Review Comment:
   Perfect



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -287,14 +298,54 @@ private[sql] class AvroSerializer(
       result
   }
 
+  /**
+   * Complex unions map to struct types where field names are member0, member1, etc.
+   * This is consistent with the behavior in [[SchemaConverters]] and when converting between Avro
+   * and Parquet.
+   */
+  private def newComplexUnionConverter(
+      catalystStruct: StructType,
+      avroType: Schema,
+      catalystPath: Seq[String],
+      avroPath: Seq[String]): InternalRow => Any = {
+    val nonNullTypes = avroType.getTypes.asScala.filter(_.getType != NULL).toSeq
+    validateComplexUnionMembers(catalystStruct, nonNullTypes, catalystPath, avroPath)
+
+    val fieldConverters = nonNullTypes.zipWithIndex.map { case (avroField, i) =>
+      val cf = catalystStruct.fields(i)
+      newConverter(cf.dataType, resolveNullableType(avroField, nullable = true),
+        catalystPath :+ cf.name, avroPath :+ cf.name)
+    }.toArray
+
+    val numFields = catalystStruct.length
+    row: InternalRow =>
+      (0 until numFields).dropWhile(row.isNullAt).headOption match {
+        case Some(i) => fieldConverters(i).apply(row, i)
+        case None => null
+      }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org