You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2024/01/17 19:14:20 UTC

Re: [PR] [SPARK-46736][PROTOBUF] retain empty message field in protobuf connector [spark]

rangadi commented on code in PR #44643:
URL: https://github.com/apache/spark/pull/44643#discussion_r1456327396


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -207,6 +207,12 @@ private[sql] class ProtobufOptions(
   //    nil => nil, Int32Value(0) => 0, Int32Value(100) => 100.
   val unwrapWellKnownTypes: Boolean =
     parameters.getOrElse("unwrap.primitive.wrapper.types", false.toString).toBoolean
+
+  // Since Spark doesn't allow empty StructType, empty proto message type as field will be

Review Comment:
   Treat this as full official documentation. This should be aimed at end user. Could you expand it and add an example? PTAL documentation for the flags above. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,55 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
-  test("Corner case: empty recursive proto fields should be dropped") {
+  test("retain empty proto fields") {
+    val options = Map("recursive.fields.max.depth" -> "4", "retain.empty.message" -> "true")
+
+    // EmptyRecursiveProto at the top level. It will be an empty struct.

Review Comment:
   Comment needs to be updated, we are using non-recursive proto below.



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1198,34 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
+  test("Corner case: retain empty recursive proto fields") {
+    // This verifies that a empty proto like 'message A { A a = 1}' can be retained by
+    // inserting a dumy field.
+
+    val emptyProtoSchema =
+      StructType(StructField("_dummy_field_to_retain_empty_message", StringType) :: Nil)
+    val expectedSchema = StructType(
+      // DDL: "proto STRUCT<name: string, groups: map<
+      //    struct<name: string, group: map<struct<name: string>>>>>"
+      StructField("empty_proto",
+        StructType(
+          StructField("recursive_field", emptyProtoSchema) ::
+            StructField("recursive_array", ArrayType(emptyProtoSchema, containsNull = false)

Review Comment:
   Where is the dummy field here?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,55 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
-  test("Corner case: empty recursive proto fields should be dropped") {
+  test("retain empty proto fields") {
+    val options = Map("recursive.fields.max.depth" -> "4", "retain.empty.message" -> "true")
+
+    // EmptyRecursiveProto at the top level. It will be an empty struct.
+    checkWithFileAndClassName("EmptyProto") {
+      case (name, descFilePathOpt) =>
+        val df = emptyBinaryDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("empty_proto")
+        )
+        // Top level empty message is retained without adding dummy column to the schema.
+        assert(df.schema == structFromDDL("empty_proto struct<>"))

Review Comment:
   Why do we have empty struct here even though we have set the option?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,55 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
-  test("Corner case: empty recursive proto fields should be dropped") {
+  test("retain empty proto fields") {
+    val options = Map("recursive.fields.max.depth" -> "4", "retain.empty.message" -> "true")
+
+    // EmptyRecursiveProto at the top level. It will be an empty struct.
+    checkWithFileAndClassName("EmptyProto") {
+      case (name, descFilePathOpt) =>
+        val df = emptyBinaryDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("empty_proto")
+        )
+        // Top level empty message is retained without adding dummy column to the schema.
+        assert(df.schema == structFromDDL("empty_proto struct<>"))
+    }
+
+    // EmptyRecursiveProto at inner level, because empty struct type is not allowed in Spark.,

Review Comment:
   Fix comment. Even better, could you move non-recursive proto test to another test?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1198,34 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
+  test("Corner case: retain empty recursive proto fields") {

Review Comment:
   update the name to say '... when <optio_name> is set'.



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,55 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
-  test("Corner case: empty recursive proto fields should be dropped") {
+  test("retain empty proto fields") {

Review Comment:
   Add description for this test.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -212,11 +212,19 @@ object SchemaConverters extends Logging {
           ).toSeq
           fields match {
             case Nil =>
-              log.info(
-                s"Dropping ${fd.getFullName} as it does not have any fields left " +
-                "likely due to recursive depth limit."
-              )
-              None
+              if (protobufOptions.retainEmptyMessage) {
+                // Insert a dummy column to retain the empty message because
+                // spark doesn't allow empty struct type.
+                Some(
+                  StructType(StructField("_dummy_field_to_retain_empty_message", StringType) :: Nil)

Review Comment:
   Better to use double underscores `__`, also it is quite long. How about `__dummy_field_in_empty_struct` ? Even my suggestion is bit long :). 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -212,11 +212,19 @@ object SchemaConverters extends Logging {
           ).toSeq
           fields match {
             case Nil =>
-              log.info(
-                s"Dropping ${fd.getFullName} as it does not have any fields left " +
-                "likely due to recursive depth limit."
-              )
-              None
+              if (protobufOptions.retainEmptyMessage) {
+                // Insert a dummy column to retain the empty message because
+                // spark doesn't allow empty struct type.
+                Some(
+                  StructType(StructField("_dummy_field_to_retain_empty_message", StringType) :: Nil)

Review Comment:
   Add a log here as well.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -207,6 +207,12 @@ private[sql] class ProtobufOptions(
   //    nil => nil, Int32Value(0) => 0, Int32Value(100) => 100.
   val unwrapWellKnownTypes: Boolean =
     parameters.getOrElse("unwrap.primitive.wrapper.types", false.toString).toBoolean
+
+  // Since Spark doesn't allow empty StructType, empty proto message type as field will be
+  // dropped by default. Setting this option to true will insert a dummy column to empty proto
+  // message so that the empty message will be retained.
+  val retainEmptyMessage: Boolean =
+    parameters.getOrElse("retain.empty.message", false.toString).toBoolean

Review Comment:
   Better to add 'type' to the name. How about `retain.empty.message.types` ? 
   Normally a 'message' refers to runtime actual message, rather than type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org