You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "siying (via GitHub)" <gi...@apache.org> on 2023/05/22 17:53:24 UTC

[GitHub] [spark] siying opened a new pull request, #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

siying opened a new pull request, #41263:
URL: https://github.com/apache/spark/pull/41263

   ### What changes were proposed in this pull request?
   Introduce SQL conf "spark.sql.avro.enableStableIdentifiersForUnionType". If it is set to true (default remains to be false), Avro's union is converted to SQL schema by naming field name "member_" + type name. This is to try to keep field name stable with type name.
   
   
   ### Why are the changes needed?
   The purpose of this is twofold:
   
   To allow adding or removing types to the union without affecting the record names of other member types. If the new or removed type is not ordered last, then existing queries referencing "member2" may need to be rewritten to reference "member1" or "member3".
   Referencing the type name in the query is more readable than referencing "member0".
   For example, our system produces an avro schema from a Java type structure where subtyping maps to union types whose members are ordered lexicographically. Adding a subtype can therefore easily result in all references to "member2" needing to be updated to "member3".
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Add a unit test that covers all types supported in union, as well as some potential name conflict cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204799849


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   Tests need to include Spark Jira ids unless the test suite is new.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1201365866


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -98,6 +98,44 @@ abstract class AvroSuite
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  def checkUnionStableId(
+    types: List[Schema],
+    expectedSchema: String,
+    fieldsAndRow : Seq[(Any, Row)]): Unit = {

Review Comment:
   I copied the way SQL schema is generated from test "Complex Union Type". I feel it is easier to write unit test this way and if possible I will maintain it. I will add comments to explain the parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1203230232


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -144,11 +148,31 @@ object SchemaConverters {
           case _ =>
             // Convert complex unions to struct types where field names are member0, member1, etc.
             // This is consistent with the behavior when converting between Avro and Parquet.
+            val useSchemaId = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE)
+
+            val fieldNameSet : mutable.Set[String] = mutable.Set()
             val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
               case (s, i) =>
                 val schemaType = toSqlTypeHelper(s, existingRecordNames)
+
+                val fieldName = if (useSchemaId) {
+                  // Avro's field name may be case sensitive, so field names for two named type
+                  // could be "a" and "A" and we need to distinguish them. In this case, we throw
+                  // an option.

Review Comment:
   nit: ... we throw an exception.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -98,6 +98,52 @@ abstract class AvroSuite
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  /* Check whether an Avro schema of union type is converted to SQL in an expected way, when the

Review Comment:
   Is this supposed to be Javadoc? If so, then it should look like this: 
   ```
   /**
    * <your text>
    */
   ```
   
   If not, you can just use the inline comments.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3413,6 +3413,17 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf(
+    "spark.sql.avro.enableStableIdentifiersForUnionType")
+    .doc("If it is set to true, then Avro is desrialized to SQL schema, the union type is " +

Review Comment:
   Let's rephrase the doc like this: 
   > If it is set to true, Avro schema is deserialized into Spark SQL schema, and the Avro Union type is transformed into a structure where the field names remain consistent with their respective types. The resulting field names are converted to lowercase, e.g. `member_int` or `member_string`. If two user-defined type names are identical regardless of case, an exception will be raised. However, in other cases, the field names can be uniquely identified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204804665


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   Does it mean we need to read the Spark Jira to understand the test? I would be surprised if there is a such policy. Do you have link?
   It is a test for a new feature. Ideally it should be understandable by itself and should not need to go to jira ticket. I have added many new tests without adding Jira id.
   I am ok if we want to include it here. I don't see any of use of doing so. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204815867


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   Thanks for the link. Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204488694


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -144,11 +148,31 @@ object SchemaConverters {
           case _ =>
             // Convert complex unions to struct types where field names are member0, member1, etc.
             // This is consistent with the behavior when converting between Avro and Parquet.
+            val useSchemaId = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE)

Review Comment:
   Normally these configs are provided as options for functions (e.g. for `from_avro()`).
   For file source, it should be an option for the source. 
   Lets not use spark conf. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204491057


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3413,6 +3413,18 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf(

Review Comment:
   Commented above. I think it should be an option for Avro functions and Avro source, not a spark conf. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1569218840

   @dongjoon-hyun do you plan to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1205895114


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -129,26 +141,50 @@ object SchemaConverters {
           // In case of a union with null, eliminate it and make a recursive call
           val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
           if (remainingUnionTypes.size == 1) {
-            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
-          } else {
-            toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
+            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, avroOptions)
               .copy(nullable = true)
+          } else {
+            toSqlTypeHelper(
+              Schema.createUnion(remainingUnionTypes.asJava),
+              existingRecordNames,
+              avroOptions).copy(nullable = true)
           }
         } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
           case Seq(t1) =>
-            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
+            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)
           case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
             SchemaType(LongType, nullable = false)
           case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
             SchemaType(DoubleType, nullable = false)
           case _ =>
-            // Convert complex unions to struct types where field names are member0, member1, etc.
-            // This is consistent with the behavior when converting between Avro and Parquet.
+            // When avroOptions.useStableIdForUnionType is false, convert complex unions to struct
+            // types where field names are member0, member1, etc. This is consistent with the
+            // behavior when converting between Avro and Parquet.

Review Comment:
   What is Parquet connection here? Should this say "consistent with default behavior before adding support for stable names"?.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -260,8 +260,13 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       |  ]
       |}
     """.stripMargin
-    val avroSchema = AvroOptions(Map("avroSchema" -> avroTypeStruct)).schema.get
-    val sparkSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val options = Map("avroSchema" -> avroTypeStruct)
+    val avroOptions = AvroOptions(options)
+    val avroSchema = avroOptions.schema.get
+    val sparkSchema = SchemaConverters.
+      toSqlType(avroSchema, options).
+      dataType.
+      asInstanceOf[StructType]

Review Comment:
   Code style: `.` should move to start. E.g. :
   
   ```scala
   val sparkSchema = SchemaConverters
         .toSqlType(avroSchema, options)
         .dataType
         .asInstanceOf[StructType]
   ```
   



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +318,154 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: Stable field names when converting Union type") {
+    checkUnionStableId(
+      List(Type.INT, Type.NULL, Type.STRING).map(Schema.create(_)),
+      "struct<member_int: int, member_string: string>",
+      Seq(
+        (42, Row(42, null)),
+        ("Alice", Row(null, "Alice"))))
 
-      val df = spark.read.format("avro").load(s"$dir.avro")
-      assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
-      assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
+    checkUnionStableId(
+      List( Type.FLOAT, Type.BOOLEAN, Type.BYTES, Type.DOUBLE, Type.LONG).map(Schema.create(_)),
+      "struct<member_float: float, member_boolean: boolean, " +
+        "member_bytes: binary, member_double: double, member_long: long>",
+      Seq(
+        (true, Row(null, true, null, null, null)),
+        (42L, Row(null, null, null, null, 42L)),
+        (42F, Row(42.0, null, null, null, null)),
+       (42D, Row(null, null, null, 42D, null))))
+
+    checkUnionStableId(
+      List(
+        Schema.createArray(Schema.create(Type.FLOAT)),
+        Schema.createMap(Schema.create(Schema.Type.INT))),
+      "struct<member_array: array<float>, member_map: map<string, int>>",
+      Seq())
+
+    checkUnionStableId(
+      List(
+        Schema.createEnum("myenum", "", null, List[String]("e1", "e2").asJava),
+        Schema.createRecord("myrecord", "", null, false,
+          List[Schema.Field](new Schema.Field("f", Schema.createFixed("myfield", "", null, 6)))
+            .asJava),
+        Schema.createRecord("myrecord2", "", null, false,
+          List[Schema.Field](new Schema.Field("f", Schema.create(Type.FLOAT)))
+            .asJava)),
+      "struct<member_myenum: string, member_myrecord: struct<f: binary>, " +
+                    "member_myrecord2: struct<f: float>>",
+      Seq())
+
+    {
+      val e = intercept[Exception] {
+        checkUnionStableId(
+          List(
+            Schema.createFixed("MYFIELD2", "", null, 6),
+            Schema.createFixed("myfield1", "", null, 6),
+            Schema.createFixed("myfield2", "", null, 9)),
+          "",
+          Seq())
+      }
+      assert(e.getMessage.contains("Cannot generate stable indentifier"))
+    }
+    {
+      val e = intercept[Exception] {
+        checkUnionStableId(
+          List(
+            Schema.createFixed("ARRAY", "", null, 6),
+            Schema.createArray(Schema.create(Type.STRING))),
+          "",
+          Seq())
+      }
+      assert(e.getMessage.contains("Cannot generate stable indentifier"))
+    }
+    // Two array types or two map types are not allowed in union.
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+           Schema.createArray(Schema.create(Type.FLOAT)),
+           Schema.createArray(Schema.create(Type.STRING))).asJava)
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+            Schema.createMap(Schema.create(Type.FLOAT)),
+            Schema.createMap(Schema.create(Type.STRING))).asJava)
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+
+    // Somehow Avro allows named type "array", but doesn't allow an array type in the same union.
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+            Schema.createArray(Schema.create(Type.FLOAT)),
+            Schema.createFixed("array", "", null, 6)
+          ).asJava
+        )
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(Schema.createFixed("long", "", null, 6)).asJava
+        )
+      }
+      assert(e.getMessage.contains("Schemas may not be named after primitives"))
+    }
+
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(Schema.createFixed("bytes", "", null, 6)).asJava
+        )
+      }
+      assert(e.getMessage.contains("Schemas may not be named after primitives"))
+    }
+  }
+
+  test("SPARK-27858 Union type: More than one non-null type") {

Review Comment:
   Could add a short description of the test in a comment at the top? This helps in understanding the test. 



##########
docs/sql-data-sources-avro.md:
##########
@@ -321,7 +321,13 @@ Data source options of Avro can be set via:
     <td>read and write</td>
     <td>3.2.0</td>
   </tr>
-</table>
+  <tr>
+    <td><code>enableStableIdentifiersForUnionType</code></td>
+    <td>false</td>
+    <td>If it is set to true, Avro schema is deserialized into Spark SQL schema, and the Avro Union type is transformed into a structure where the field names remain consistent with their respective types. The resulting field names are converted to lowercase, e.g. member_int or member_string. If two user-defined type names or a user-defined type name and a built-in type name are identical regardless of case, an exception will be raised. However, in other cases, the field names can be uniquely identified.</td>

Review Comment:
   Please copy this description to `AvroOptions.scala` as well. 



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -154,4 +157,5 @@ private[sql] object AvroOptions extends DataSourceOptions {
   // datasource similarly to the SQL config `spark.sql.avro.datetimeRebaseModeInRead`,
   // and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
   val DATETIME_REBASE_MODE = newOption("datetimeRebaseMode")
+  val STABLE_ID_FOR_UNION_TYPE = newOption("enableStableIdentifiersForUnionType")

Review Comment:
   Can we add documentation for this? I think Spark conf version had long doc comment. We can reuse that here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1205923662


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +318,154 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: Stable field names when converting Union type") {
+    checkUnionStableId(
+      List(Type.INT, Type.NULL, Type.STRING).map(Schema.create(_)),
+      "struct<member_int: int, member_string: string>",
+      Seq(
+        (42, Row(42, null)),
+        ("Alice", Row(null, "Alice"))))
 
-      val df = spark.read.format("avro").load(s"$dir.avro")
-      assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
-      assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
+    checkUnionStableId(
+      List( Type.FLOAT, Type.BOOLEAN, Type.BYTES, Type.DOUBLE, Type.LONG).map(Schema.create(_)),
+      "struct<member_float: float, member_boolean: boolean, " +
+        "member_bytes: binary, member_double: double, member_long: long>",
+      Seq(
+        (true, Row(null, true, null, null, null)),
+        (42L, Row(null, null, null, null, 42L)),
+        (42F, Row(42.0, null, null, null, null)),
+       (42D, Row(null, null, null, 42D, null))))
+
+    checkUnionStableId(
+      List(
+        Schema.createArray(Schema.create(Type.FLOAT)),
+        Schema.createMap(Schema.create(Schema.Type.INT))),
+      "struct<member_array: array<float>, member_map: map<string, int>>",
+      Seq())
+
+    checkUnionStableId(
+      List(
+        Schema.createEnum("myenum", "", null, List[String]("e1", "e2").asJava),
+        Schema.createRecord("myrecord", "", null, false,
+          List[Schema.Field](new Schema.Field("f", Schema.createFixed("myfield", "", null, 6)))
+            .asJava),
+        Schema.createRecord("myrecord2", "", null, false,
+          List[Schema.Field](new Schema.Field("f", Schema.create(Type.FLOAT)))
+            .asJava)),
+      "struct<member_myenum: string, member_myrecord: struct<f: binary>, " +
+                    "member_myrecord2: struct<f: float>>",
+      Seq())
+
+    {
+      val e = intercept[Exception] {
+        checkUnionStableId(
+          List(
+            Schema.createFixed("MYFIELD2", "", null, 6),
+            Schema.createFixed("myfield1", "", null, 6),
+            Schema.createFixed("myfield2", "", null, 9)),
+          "",
+          Seq())
+      }
+      assert(e.getMessage.contains("Cannot generate stable indentifier"))
+    }
+    {
+      val e = intercept[Exception] {
+        checkUnionStableId(
+          List(
+            Schema.createFixed("ARRAY", "", null, 6),
+            Schema.createArray(Schema.create(Type.STRING))),
+          "",
+          Seq())
+      }
+      assert(e.getMessage.contains("Cannot generate stable indentifier"))
+    }
+    // Two array types or two map types are not allowed in union.
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+           Schema.createArray(Schema.create(Type.FLOAT)),
+           Schema.createArray(Schema.create(Type.STRING))).asJava)
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+            Schema.createMap(Schema.create(Type.FLOAT)),
+            Schema.createMap(Schema.create(Type.STRING))).asJava)
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+
+    // Somehow Avro allows named type "array", but doesn't allow an array type in the same union.
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+            Schema.createArray(Schema.create(Type.FLOAT)),
+            Schema.createFixed("array", "", null, 6)
+          ).asJava
+        )
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(Schema.createFixed("long", "", null, 6)).asJava
+        )
+      }
+      assert(e.getMessage.contains("Schemas may not be named after primitives"))
+    }
+
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(Schema.createFixed("bytes", "", null, 6)).asJava
+        )
+      }
+      assert(e.getMessage.contains("Schemas may not be named after primitives"))
+    }
+  }
+
+  test("SPARK-27858 Union type: More than one non-null type") {

Review Comment:
   This is not a new test. It's an existing test. I just added the scenario of stable ID. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1205917118


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala:
##########
@@ -260,8 +260,13 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       |  ]
       |}
     """.stripMargin
-    val avroSchema = AvroOptions(Map("avroSchema" -> avroTypeStruct)).schema.get
-    val sparkSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val options = Map("avroSchema" -> avroTypeStruct)
+    val avroOptions = AvroOptions(options)
+    val avroSchema = avroOptions.schema.get
+    val sparkSchema = SchemaConverters.
+      toSqlType(avroSchema, options).
+      dataType.
+      asInstanceOf[StructType]

Review Comment:
   Thanks for catching it. I don't know why it became like that.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -129,26 +141,50 @@ object SchemaConverters {
           // In case of a union with null, eliminate it and make a recursive call
           val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
           if (remainingUnionTypes.size == 1) {
-            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
-          } else {
-            toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
+            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, avroOptions)
               .copy(nullable = true)
+          } else {
+            toSqlTypeHelper(
+              Schema.createUnion(remainingUnionTypes.asJava),
+              existingRecordNames,
+              avroOptions).copy(nullable = true)
           }
         } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
           case Seq(t1) =>
-            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
+            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)
           case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
             SchemaType(LongType, nullable = false)
           case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
             SchemaType(DoubleType, nullable = false)
           case _ =>
-            // Convert complex unions to struct types where field names are member0, member1, etc.
-            // This is consistent with the behavior when converting between Avro and Parquet.
+            // When avroOptions.useStableIdForUnionType is false, convert complex unions to struct
+            // types where field names are member0, member1, etc. This is consistent with the
+            // behavior when converting between Avro and Parquet.

Review Comment:
   This is the existing comment. I just got in different lines after adding "When avroOptions.useStableIdForUnionType is false" in the beginning. I don't know what it is and I have no reason to doubt it is wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204804665


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   Does it mean we need to read the Spark Jira to understand the test? I would be surprised if there is a such policy. Do you have link?
   It is a test for new feature. Ideally it should be understandable by itself not need to go to jira ticket. I have added many new tests without adding Jira id.
   I am ok if we want to include it here. I don't see any of use of doing so. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1201258479


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3413,6 +3413,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf(
+    "spark.sql.avro.enableStableIdentifiersForUnionType")
+    .doc("When Avro is desrialized to SQL schema, the union type is converted to structure in a " +
+      "way that field names of the structure are stable with the type, in most cases.")

Review Comment:
   The description seems to need revisions to clarify what is the difference between `true` and `false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1201257860


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3413,6 +3413,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf(
+    "spark.sql.avro.enableStableIdentifiersForUnionType")
+    .doc("When Avro is desrialized to SQL schema, the union type is converted to structure in a " +
+      "way that field names of the structure are stable with the type, in most cases.")
+    .version("3.4.0")

Review Comment:
   `3.5.0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang closed pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type
URL: https://github.com/apache/spark/pull/41263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1201254406


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +309,139 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("union stable id") {

Review Comment:
   Almost forgot to mention, could you change this to `SPARK-43333: union type with stable ids`? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1205032443


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +318,154 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   Could you update the test name, e.g. `Stable field names when converting Union type` or `Union type: stable field ids/names`? So other contributors could understand what is being tested here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1558129679

   cc @dongjoon-hyun @sunchao @gengliangwang


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1564205552

   +CC @shardulm94 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1201231958


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -144,11 +147,28 @@ object SchemaConverters {
           case _ =>
             // Convert complex unions to struct types where field names are member0, member1, etc.
             // This is consistent with the behavior when converting between Avro and Parquet.
+            val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE)

Review Comment:
   nit: Could you use `useStableId`?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -144,11 +147,28 @@ object SchemaConverters {
           case _ =>
             // Convert complex unions to struct types where field names are member0, member1, etc.
             // This is consistent with the behavior when converting between Avro and Parquet.
+            val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE)
+
+            var fieldNameSet : Set[String] = Set()

Review Comment:
   This would copy the set every time you add an element. We can change it to a mutable set (https://www.scala-lang.org/api/2.13.6/scala/collection/mutable/Set$.html) or just `java.util.HashSet`.



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -144,11 +147,28 @@ object SchemaConverters {
           case _ =>
             // Convert complex unions to struct types where field names are member0, member1, etc.
             // This is consistent with the behavior when converting between Avro and Parquet.
+            val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE)
+
+            var fieldNameSet : Set[String] = Set()
             val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
               case (s, i) =>
                 val schemaType = toSqlTypeHelper(s, existingRecordNames)
                 // All fields are nullable because only one of them is set at a time
-                StructField(s"member$i", schemaType.dataType, nullable = true)
+
+                val fieldName = if (use_stable_id) {
+                  // Avro's field name may be case sensitive, so field names for two named type
+                  // could be "a" and "A" and we need to distinguish them.
+                  var temp_name = s"member_${s.getName.toLowerCase(Locale.ROOT)}"
+                  while (fieldNameSet.contains(temp_name)) {
+                    temp_name = s"${temp_name}_$i"

Review Comment:
   I was thinking if we could simply throw an error when this case happens; the reason is that they might not be stable identifiers anymore. We could explain that stable identifiers can only be used if the types are unique, if you have more than one type that has the same name, please use sequential numeric ids instead.
   
   Alternatively, we could just store them as is, without converting to upper or lower case - that could be an option.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -98,6 +98,44 @@ abstract class AvroSuite
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  def checkUnionStableId(
+    types: List[Schema],

Review Comment:
   nit: 4 space indentation for method parameters. Could you also make it private?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -144,11 +147,28 @@ object SchemaConverters {
           case _ =>
             // Convert complex unions to struct types where field names are member0, member1, etc.
             // This is consistent with the behavior when converting between Avro and Parquet.
+            val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE)
+
+            var fieldNameSet : Set[String] = Set()
             val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
               case (s, i) =>
                 val schemaType = toSqlTypeHelper(s, existingRecordNames)
                 // All fields are nullable because only one of them is set at a time

Review Comment:
   Could you move this comment to L171 - it was referring to the `nullable` flag.



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +309,139 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("union stable id") {
+    checkUnionStableId(
+      List(Type.INT, Type.NULL, Type.STRING).map(Schema.create(_)),
+      "struct<member_int: int, member_string: string>",
+      Seq(
+        (42, Row(42, null)),
+        ("Alice", Row(null, "Alice"))))
 
-      val df = spark.read.format("avro").load(s"$dir.avro")
-      assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
-      assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
+    checkUnionStableId(
+      List( Type.FLOAT, Type.BOOLEAN, Type.BYTES, Type.DOUBLE, Type.LONG).map(Schema.create(_)),
+      "struct<member_float: float, member_boolean: boolean, " +
+        "member_bytes: binary, member_double: double, member_long: long>",
+      Seq(
+        (true, Row(null, true, null, null, null)),
+        (42L, Row(null, null, null, null, 42L)),
+        (42F, Row(42.0, null, null, null, null)),
+       (42D, Row(null, null, null, 42D, null))))
+
+    checkUnionStableId(
+      List(
+        Schema.createArray(Schema.create(Type.FLOAT)),
+        Schema.createMap(Schema.create(Schema.Type.INT))),
+      "struct<member_array: array<float>, member_map: map<string, int>>",
+      Seq())
+
+    checkUnionStableId(
+      List(
+        Schema.createEnum("myenum", "", null, List[String]("e1", "e2").asJava),
+        Schema.createRecord("myrecord", "", null, false,
+          List[Schema.Field](new Schema.Field("field", Schema.createFixed("myfield", "", null, 6)))
+            .asJava),
+        Schema.createRecord("myrecord2", "", null, false,
+          List[Schema.Field](new Schema.Field("field", Schema.create(Type.FLOAT)))
+            .asJava)),
+      "struct<member_myenum: string, member_myrecord: struct<field: binary>, " +
+        "member_myrecord2: struct<field: float>>",
+      Seq())
+
+    // Two array or map is not allowed in union.

Review Comment:
   nit: Could we change the comment to this: `Two array types or two map types are not allowed in union.`



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -98,6 +98,44 @@ abstract class AvroSuite
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  def checkUnionStableId(
+    types: List[Schema],
+    expectedSchema: String,
+    fieldsAndRow : Seq[(Any, Row)]): Unit = {

Review Comment:
   nit: no space before `:`.
   
   Also, could you explain the type? I think you can just pass the expected DataFrame that would contain the schema and the data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1558131316

   Thank you for pinging me, @sadikovi .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204490385


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   We can remove SPARK jira id here.
   Can we include a user defined Avro struct also in addition to primitive types? Say 'CustomerInfo'. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #41263: [SPARK-43333][CORE] Allow Avro to convert union type to SQL with field name stable with type

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1201260303


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3413,6 +3413,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf(
+    "spark.sql.avro.enableStableIdentifiersForUnionType")
+    .doc("When Avro is desrialized to SQL schema, the union type is converted to structure in a " +
+      "way that field names of the structure are stable with the type, in most cases.")

Review Comment:
   Especially, please mention the `case-sensitivity` cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1564666771

   @dongjoon-hyun the tests are all pass now. Can you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204808076


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   You can find a note on this in https://spark.apache.org/contributing.html (Pull request section).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a diff in pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on code in PR #41263:
URL: https://github.com/apache/spark/pull/41263#discussion_r1204806519


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -271,29 +316,152 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  test("SPARK-43333: union stable id") {

Review Comment:
   Yes, Jira number needs to be included, however, the test name should be descriptive enough to understand what the test does. Jira number is added for the reference, if the test breaks, it is much easier to track down the original change and understand the motivation behind it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1562219052

   Move the knob from SQLConf to AvroOptions, after the discussion with @sadikovi and @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1563349116

   @dongjoon-hyun I addressed the comments and the CI appears to pass now. Can you help take a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on pull request #41263: [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type

Posted by "sadikovi (via GitHub)" <gi...@apache.org>.
sadikovi commented on PR #41263:
URL: https://github.com/apache/spark/pull/41263#issuecomment-1560335616

   @dongjoon-hyun Could you take another look when you have time? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org