You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sadikovi (via GitHub)" <gi...@apache.org> on 2023/08/23 01:59:20 UTC

[GitHub] [spark] sadikovi commented on a diff in pull request #42618: [SPARK-44919] Avro connector: convert a union of a single primitive type to a StructType

sadikovi commented on code in PR #42618:
URL: https://github.com/apache/spark/pull/42618#discussion_r1302356178


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -142,18 +143,30 @@ object SchemaConverters {
         if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
           // In case of a union with null, eliminate it and make a recursive call
           val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
-          if (remainingUnionTypes.size == 1) {
-            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, avroOptions)
-              .copy(nullable = true)
-          } else {
-            toSqlTypeHelper(
-              Schema.createUnion(remainingUnionTypes.asJava),
-              existingRecordNames,
-              avroOptions).copy(nullable = true)
-          }
+          toSqlTypeHelper(
+            Schema.createUnion(remainingUnionTypes.asJava),
+            existingRecordNames,
+            avroOptions).copy(nullable = true)
         } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
           case Seq(t1) =>
-            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)
+            // If spark.sql.avro.alwaysConvertUnionToStructType is set to false (default),
+            // we convert Avro union with a single primitive type into a primitive Spark type
+            // instead of a StructType.
+            if (!SQLConf.get.avroAlwaysConvertUnionToStruct) {
+              toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)
+            } else {
+              val singleton = avroSchema.getTypes.get(0)
+              val schemaType = toSqlTypeHelper(singleton, existingRecordNames, avroOptions)
+              val fieldName = if (avroOptions.useStableIdForUnionType) {
+                s"member_${singleton.getName.toLowerCase(Locale.ROOT)}"
+              } else {
+                s"member0"

Review Comment:
   Could you explore if there is a way to not duplicate the logic of stable identifiers? 



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -142,18 +143,30 @@ object SchemaConverters {
         if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
           // In case of a union with null, eliminate it and make a recursive call
           val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
-          if (remainingUnionTypes.size == 1) {
-            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, avroOptions)
-              .copy(nullable = true)
-          } else {
-            toSqlTypeHelper(
-              Schema.createUnion(remainingUnionTypes.asJava),
-              existingRecordNames,
-              avroOptions).copy(nullable = true)
-          }
+          toSqlTypeHelper(
+            Schema.createUnion(remainingUnionTypes.asJava),
+            existingRecordNames,
+            avroOptions).copy(nullable = true)
         } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
           case Seq(t1) =>
-            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)
+            // If spark.sql.avro.alwaysConvertUnionToStructType is set to false (default),
+            // we convert Avro union with a single primitive type into a primitive Spark type
+            // instead of a StructType.
+            if (!SQLConf.get.avroAlwaysConvertUnionToStruct) {
+              toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)

Review Comment:
   I was going to suggest doing this for Avro schema converter in the Avro data source where this is required because the top schema is always expected to be a struct.
   
   For example, we change this code in AvroUtils: 
   ```scala
   // Converts Avro schema to sql type and ensures that the top level data type is either an Avro
   // record or a complex union that both result in a conversion to StructType
   def convertAvroToSqlSchema(avroSchema: Schema, avroOptions: AvroOptions): StructType = {
     SchemaConverters.toSqlTypeHelper(avroSchema, Set.empty, avroOptions).dataType match {
       case t: StructType => t
       case _ => throw new RuntimeException(
         s"""Avro schema cannot be converted to a Spark SQL StructType:
            |
            |${avroSchema.toString(true)}
            |""".stripMargin)
     }
   }
   ```
   
   to something like this: 
   ```scala
   // Converts Avro schema to sql type and ensures that the top level data type is either an Avro
   // record or a complex union that both result in a conversion to StructType
   def convertAvroToSqlSchema(avroSchema: Schema, avroOptions: AvroOptions): StructType = {
     SchemaConverters.toSqlTypeHelper(avroSchema, Set.empty, avroOptions).dataType match {
       case t: StructType => t
       case t: AtomicType => StructType(StructField("value", t, nullable = true))
       case _ => throw new RuntimeException(
         s"""Avro schema cannot be converted to a Spark SQL StructType:
            |
            |${avroSchema.toString(true)}
            |""".stripMargin)
     }
   }
   ```
   
   Can we also check that this change will not affect `from_avro` SQL function? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org