You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/12/11 18:46:40 UTC

Re: [PR] [SPARK-44001][PROTOBUF] Add option to allow unwrapping protobuf well known wrapper types [spark]

rangadi commented on code in PR #43767:
URL: https://github.com/apache/spark/pull/43767#discussion_r1401347343


##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1600,6 +1600,195 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
+  test("well known types deserialization and round trip") {
+    val message = spark.range(1).select(
+      lit(WellKnownWrapperTypes
+        .newBuilder()
+        .setBoolVal(BoolValue.of(true))
+        .setInt32Val(Int32Value.of(100))
+        .setUint32Val(UInt32Value.of(200))
+        .setInt64Val(Int64Value.of(300))
+        .setUint64Val(UInt64Value.of(400))
+        .setStringVal(StringValue.of("string"))
+        .setBytesVal(BytesValue.of(ByteString.copyFromUtf8("bytes")))
+        .setFloatVal(FloatValue.of(1.23f))
+        .setDoubleVal(DoubleValue.of(4.56))
+        .addInt32List(Int32Value.of(1))
+        .addInt32List(Int32Value.of(2))
+        .putWktMap(1, StringValue.of("mapval"))
+        .build().toByteArray
+      ).as("raw_proto"))
+
+    // By default, well known wrapper types should come out as structs.
+    val expectedWithoutFlag = spark.range(1).select(
+      struct(
+        struct(lit(true) as ("value")).as("bool_val"),
+        struct(lit(100).as("value")).as("int32_val"),
+        struct(lit(200).as("value")).as("uint32_val"),
+        struct(lit(300).as("value")).as("int64_val"),
+        struct(lit(400).as("value")).as("uint64_val"),
+        struct(lit("string").as("value")).as("string_val"),
+        struct(lit("bytes".getBytes).as("value")).as("bytes_val"),
+        struct(lit(1.23f).as("value")).as("float_val"),
+        struct(lit(4.56).as("value")).as("double_val"),
+        array(struct(lit(1).as("value")), struct(lit(2).as("value"))).as("int32_list"),
+        map(lit(1), struct(lit("mapval").as("value"))).as("wkt_map")
+      ).as("proto")
+    )
+
+    // With the flag set, ensure that well known wrapper types get deserialized as primitives.
+    val expectedWithFlag = spark.range(1).select(
+      struct(
+        lit(true).as("bool_val"),
+        lit(100).as("int32_val"),
+        lit(200).as("uint32_val"),
+        lit(300).as("int64_val"),
+        lit(400).as("uint64_val"),
+        lit("string").as("string_val"),
+        lit("bytes".getBytes).as("bytes_val"),
+        lit(1.23f).as("float_val"),
+        lit(4.56).as("double_val"),
+        typedLit(List(1, 2)).as("int32_list"),
+        typedLit(Map(1 -> "mapval")).as("wkt_map")
+      ).as("proto")
+    )
+
+    checkWithFileAndClassName("WellKnownWrapperTypes") { case (name, descFilePathOpt) =>
+      // With the option not set or set as false, ensure that the deserialization is as
+      // expected and that round-tripping works.
+      List(Map.empty[String, String], Map("unwrap.primitive.wrapper.types" -> "false"))
+        .foreach(opts => {
+        val parsed = message.select(from_protobuf_wrapper(
+          $"raw_proto",
+          name,
+          descFilePathOpt,
+          opts).as("parsed"))
+        checkAnswer(parsed, expectedWithoutFlag)
+
+        // Verify that round-tripping gives us the same parsed representation.
+        val reserialized = parsed.select(
+          to_protobuf_wrapper($"parsed", name, descFilePathOpt).as("reserialized"))
+        val reparsed = reserialized.select(
+          from_protobuf_wrapper($"reserialized", name, descFilePathOpt, opts).as("reparsed"))
+        checkAnswer(parsed, reparsed)
+      })
+
+      // Without the option not set or set as false, ensure that the deserialization is as
+      // expected and that round-tripping works.

Review Comment:
   Fix the comment.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -168,6 +168,28 @@ private[sql] class ProtobufOptions(
   // instead of string, so use caution if changing existing parsing logic.
   val enumsAsInts: Boolean =
     parameters.getOrElse("enums.as.ints", false.toString).toBoolean
+
+  // Whether to unwrap the struct representation for well known primitve wrapper types when
+  // deserializing. By default, the wrapper types for primitives (i.e. google.protobuf.Int32Value,
+  // google.protobuf.Int64Value, etc.) will get deserialized as structs. We allow the option to
+  // deserialize them as their respective primitives.
+  // https://protobuf.dev/reference/protobuf/google.protobuf/
+  //
+  // For example, given a message like:
+  // ```
+  // syntax = "proto3";
+  // message = Example {

Review Comment:
   Remove `=` 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1600,6 +1600,195 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
+  test("well known types deserialization and round trip") {
+    val message = spark.range(1).select(
+      lit(WellKnownWrapperTypes
+        .newBuilder()
+        .setBoolVal(BoolValue.of(true))
+        .setInt32Val(Int32Value.of(100))
+        .setUint32Val(UInt32Value.of(200))
+        .setInt64Val(Int64Value.of(300))
+        .setUint64Val(UInt64Value.of(400))
+        .setStringVal(StringValue.of("string"))
+        .setBytesVal(BytesValue.of(ByteString.copyFromUtf8("bytes")))
+        .setFloatVal(FloatValue.of(1.23f))
+        .setDoubleVal(DoubleValue.of(4.56))
+        .addInt32List(Int32Value.of(1))
+        .addInt32List(Int32Value.of(2))
+        .putWktMap(1, StringValue.of("mapval"))
+        .build().toByteArray
+      ).as("raw_proto"))
+
+    // By default, well known wrapper types should come out as structs.
+    val expectedWithoutFlag = spark.range(1).select(
+      struct(
+        struct(lit(true) as ("value")).as("bool_val"),
+        struct(lit(100).as("value")).as("int32_val"),
+        struct(lit(200).as("value")).as("uint32_val"),
+        struct(lit(300).as("value")).as("int64_val"),
+        struct(lit(400).as("value")).as("uint64_val"),
+        struct(lit("string").as("value")).as("string_val"),
+        struct(lit("bytes".getBytes).as("value")).as("bytes_val"),
+        struct(lit(1.23f).as("value")).as("float_val"),
+        struct(lit(4.56).as("value")).as("double_val"),
+        array(struct(lit(1).as("value")), struct(lit(2).as("value"))).as("int32_list"),
+        map(lit(1), struct(lit("mapval").as("value"))).as("wkt_map")
+      ).as("proto")
+    )
+
+    // With the flag set, ensure that well known wrapper types get deserialized as primitives.
+    val expectedWithFlag = spark.range(1).select(
+      struct(
+        lit(true).as("bool_val"),
+        lit(100).as("int32_val"),
+        lit(200).as("uint32_val"),
+        lit(300).as("int64_val"),
+        lit(400).as("uint64_val"),
+        lit("string").as("string_val"),
+        lit("bytes".getBytes).as("bytes_val"),
+        lit(1.23f).as("float_val"),
+        lit(4.56).as("double_val"),
+        typedLit(List(1, 2)).as("int32_list"),
+        typedLit(Map(1 -> "mapval")).as("wkt_map")
+      ).as("proto")
+    )
+
+    checkWithFileAndClassName("WellKnownWrapperTypes") { case (name, descFilePathOpt) =>
+      // With the option not set or set as false, ensure that the deserialization is as
+      // expected and that round-tripping works.
+      List(Map.empty[String, String], Map("unwrap.primitive.wrapper.types" -> "false"))
+        .foreach(opts => {
+        val parsed = message.select(from_protobuf_wrapper(
+          $"raw_proto",
+          name,
+          descFilePathOpt,
+          opts).as("parsed"))
+        checkAnswer(parsed, expectedWithoutFlag)
+
+        // Verify that round-tripping gives us the same parsed representation.
+        val reserialized = parsed.select(
+          to_protobuf_wrapper($"parsed", name, descFilePathOpt).as("reserialized"))
+        val reparsed = reserialized.select(
+          from_protobuf_wrapper($"reserialized", name, descFilePathOpt, opts).as("reparsed"))
+        checkAnswer(parsed, reparsed)
+      })
+
+      // Without the option not set or set as false, ensure that the deserialization is as
+      // expected and that round-tripping works.
+      val opt = Map("unwrap.primitive.wrapper.types" -> "true")
+      val parsed = message.select(from_protobuf_wrapper(
+        $"raw_proto",
+        name,
+        descFilePathOpt,
+        opt).as("parsed"))
+      checkAnswer(parsed, expectedWithFlag)
+
+      val reserialized = parsed.select(
+        to_protobuf_wrapper($"parsed", name, descFilePathOpt).as("reserialized"))
+      val reparsed = reserialized.select(
+        from_protobuf_wrapper($"reserialized", name, descFilePathOpt, opt).as("reparsed"))
+      checkAnswer(parsed, reparsed)
+    }
+  }
+
+  // Test that the emit defaults behavior and unwrap primitives behavior work correctly together.
+  test("test well known wrappers with emit defaults") {
+
+    val unset = spark.range(1).select(
+      lit(
+        WellKnownWrapperTypes.newBuilder().build().toByteArray
+      ).as("raw_proto"))
+
+    val explicitZero = spark.range(1).select(
+      lit(
+        WellKnownWrapperTypes.newBuilder().setInt32Val(Int32Value.of(0)).build().toByteArray
+      ).as("raw_proto"))
+
+    val explicitNonzero = spark.range(1).select(
+      lit(
+        WellKnownWrapperTypes.newBuilder().setInt32Val(Int32Value.of(100)).build().toByteArray
+      ).as("raw_proto"))
+
+    val expectedEmpty = spark.range(1).select(lit(null).as("int32_val"))
+
+    checkWithFileAndClassName("WellKnownWrapperTypes") { case (name, descFilePathOpt) =>
+      for {
+        unwrap <- Seq("true", "false")
+        defaults <- Seq("true", "false")
+      } {
+        // For unset values, we'll always get back null

Review Comment:
   This is fine. Just to make sure, this is different from primitive int, right? 
   In the case of primitive int, value would be 0, not null when `emit.default.values` is true. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1600,6 +1600,195 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
     }
   }
 
+  test("well known types deserialization and round trip") {
+    val message = spark.range(1).select(
+      lit(WellKnownWrapperTypes
+        .newBuilder()
+        .setBoolVal(BoolValue.of(true))
+        .setInt32Val(Int32Value.of(100))
+        .setUint32Val(UInt32Value.of(200))
+        .setInt64Val(Int64Value.of(300))
+        .setUint64Val(UInt64Value.of(400))
+        .setStringVal(StringValue.of("string"))
+        .setBytesVal(BytesValue.of(ByteString.copyFromUtf8("bytes")))
+        .setFloatVal(FloatValue.of(1.23f))
+        .setDoubleVal(DoubleValue.of(4.56))
+        .addInt32List(Int32Value.of(1))
+        .addInt32List(Int32Value.of(2))
+        .putWktMap(1, StringValue.of("mapval"))
+        .build().toByteArray
+      ).as("raw_proto"))
+
+    // By default, well known wrapper types should come out as structs.
+    val expectedWithoutFlag = spark.range(1).select(
+      struct(
+        struct(lit(true) as ("value")).as("bool_val"),
+        struct(lit(100).as("value")).as("int32_val"),
+        struct(lit(200).as("value")).as("uint32_val"),
+        struct(lit(300).as("value")).as("int64_val"),
+        struct(lit(400).as("value")).as("uint64_val"),
+        struct(lit("string").as("value")).as("string_val"),
+        struct(lit("bytes".getBytes).as("value")).as("bytes_val"),
+        struct(lit(1.23f).as("value")).as("float_val"),
+        struct(lit(4.56).as("value")).as("double_val"),
+        array(struct(lit(1).as("value")), struct(lit(2).as("value"))).as("int32_list"),
+        map(lit(1), struct(lit("mapval").as("value"))).as("wkt_map")
+      ).as("proto")
+    )
+
+    // With the flag set, ensure that well known wrapper types get deserialized as primitives.
+    val expectedWithFlag = spark.range(1).select(
+      struct(
+        lit(true).as("bool_val"),
+        lit(100).as("int32_val"),
+        lit(200).as("uint32_val"),
+        lit(300).as("int64_val"),
+        lit(400).as("uint64_val"),
+        lit("string").as("string_val"),
+        lit("bytes".getBytes).as("bytes_val"),
+        lit(1.23f).as("float_val"),
+        lit(4.56).as("double_val"),
+        typedLit(List(1, 2)).as("int32_list"),
+        typedLit(Map(1 -> "mapval")).as("wkt_map")
+      ).as("proto")
+    )
+
+    checkWithFileAndClassName("WellKnownWrapperTypes") { case (name, descFilePathOpt) =>
+      // With the option not set or set as false, ensure that the deserialization is as
+      // expected and that round-tripping works.
+      List(Map.empty[String, String], Map("unwrap.primitive.wrapper.types" -> "false"))
+        .foreach(opts => {
+        val parsed = message.select(from_protobuf_wrapper(
+          $"raw_proto",
+          name,
+          descFilePathOpt,
+          opts).as("parsed"))
+        checkAnswer(parsed, expectedWithoutFlag)
+
+        // Verify that round-tripping gives us the same parsed representation.
+        val reserialized = parsed.select(
+          to_protobuf_wrapper($"parsed", name, descFilePathOpt).as("reserialized"))
+        val reparsed = reserialized.select(
+          from_protobuf_wrapper($"reserialized", name, descFilePathOpt, opts).as("reparsed"))
+        checkAnswer(parsed, reparsed)
+      })
+
+      // Without the option not set or set as false, ensure that the deserialization is as
+      // expected and that round-tripping works.
+      val opt = Map("unwrap.primitive.wrapper.types" -> "true")
+      val parsed = message.select(from_protobuf_wrapper(
+        $"raw_proto",
+        name,
+        descFilePathOpt,
+        opt).as("parsed"))
+      checkAnswer(parsed, expectedWithFlag)
+
+      val reserialized = parsed.select(
+        to_protobuf_wrapper($"parsed", name, descFilePathOpt).as("reserialized"))
+      val reparsed = reserialized.select(
+        from_protobuf_wrapper($"reserialized", name, descFilePathOpt, opt).as("reparsed"))
+      checkAnswer(parsed, reparsed)
+    }
+  }
+
+  // Test that the emit defaults behavior and unwrap primitives behavior work correctly together.
+  test("test well known wrappers with emit defaults") {
+

Review Comment:
   Could you add a short description of what the following is doing? It will save time for future readers. The above above gives the goal for the test. The description here would help with how it tests. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -161,7 +192,7 @@ object SchemaConverters extends Logging {
             case Nil =>
               log.info(
                 s"Dropping ${fd.getFullName} as it does not have any fields left " +
-                "likely due to recursive depth limit."

Review Comment:
   Remove?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -168,6 +168,28 @@ private[sql] class ProtobufOptions(
   // instead of string, so use caution if changing existing parsing logic.
   val enumsAsInts: Boolean =
     parameters.getOrElse("enums.as.ints", false.toString).toBoolean
+
+  // Whether to unwrap the struct representation for well known primitve wrapper types when
+  // deserializing. By default, the wrapper types for primitives (i.e. google.protobuf.Int32Value,
+  // google.protobuf.Int64Value, etc.) will get deserialized as structs. We allow the option to
+  // deserialize them as their respective primitives.
+  // https://protobuf.dev/reference/protobuf/google.protobuf/
+  //
+  // For example, given a message like:
+  // ```
+  // syntax = "proto3";
+  // message = Example {
+  //   google.protobuf.Int32Value val = 1;

Review Comment:
   nit: rename this to `int_val` ('val' is confusing for Scala code). 
   Also use a value of 5 or 10 in the example below (just to make it different from field value in message definition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org