You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/21 01:12:03 UTC

[GitHub] [spark] mposdev21 commented on a diff in pull request #38324: [Temp] Protobuf generate V2 and V3 protos and extend tests.

mposdev21 commented on code in PR #38324:
URL: https://github.com/apache/spark/pull/38324#discussion_r1001249602


##########
connector/protobuf/src/test/protobuf/v2/catalyst_types.proto:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
+// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/catalyst_types.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
+

Review Comment:
   Do we still want to keep these comments ?



##########
connector/protobuf/src/test/protobuf/v2/functions_suite.proto:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// To compile and create test class:
+// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/functions_suite.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+
+syntax = "proto2";
+
+package org.apache.spark.sql.protobuf.protos.v2;
+
+option java_outer_classname = "SimpleMessageProtos";
+
+message SimpleMessageJavaTypes {
+  optional int64 id = 1;
+  optional string string_value = 2;
+  optional int32 int32_value = 3;
+  optional int64 int64_value = 4;
+  optional double double_value = 5;
+  optional float float_value = 6;
+  optional bool bool_value = 7;
+  optional bytes bytes_value = 8;
+}
+
+message SimpleMessage {
+  optional int64 id = 1;
+  optional string string_value = 2;
+  optional int32 int32_value = 3;
+  optional uint32 uint32_value = 4;
+  optional sint32 sint32_value = 5;
+  optional fixed32 fixed32_value = 6;
+  optional sfixed32 sfixed32_value = 7;
+  optional int64 int64_value = 8;
+  optional uint64 uint64_value = 9;
+  optional sint64 sint64_value = 10;
+  optional fixed64 fixed64_value = 11;
+  optional sfixed64 sfixed64_value = 12;
+  optional double double_value = 13;
+  optional float float_value = 14;
+  optional bool bool_value = 15;
+  optional bytes bytes_value = 16;
+}
+
+message SimpleMessageRepeated {
+  optional string key = 1;
+  optional string value = 2;
+  enum NestedEnum {
+    ESTED_NOTHING = 0;
+    NESTED_FIRST = 1;
+    NESTED_SECOND = 2;
+  }
+  repeated string rstring_value = 3;
+  repeated int32 rint32_value = 4;
+  repeated bool rbool_value = 5;
+  repeated int64 rint64_value = 6;
+  repeated float rfloat_value = 7;
+  repeated double rdouble_value = 8;
+  repeated bytes rbytes_value = 9;
+  repeated NestedEnum rnested_enum = 10;
+}
+
+message BasicMessage {
+  optional int64 id = 1;
+  optional string string_value = 2;
+  optional int32 int32_value = 3;
+  optional int64 int64_value = 4;
+  optional double double_value = 5;
+  optional float float_value = 6;
+  optional bool bool_value = 7;
+  optional bytes bytes_value = 8;
+}
+
+message RepeatedMessage {
+  repeated BasicMessage basic_message = 1;
+}
+
+message SimpleMessageMap {
+  optional string key = 1;
+  optional string value = 2;
+  map<string, string> string_mapdata = 3;
+  map<int32, int32> int32_mapdata = 4;
+  map<uint32, uint32> uint32_mapdata = 5;
+  map<sint32, sint32> sint32_mapdata = 6;
+  map<fixed32, fixed32> float32_mapdata = 7;
+  map<sfixed32, sfixed32> sfixed32_mapdata = 8;
+  map<int64, int64> int64_mapdata = 9;
+  map<uint64, uint64> uint64_mapdata = 10;
+  map<sint64, sint64> sint64_mapdata = 11;
+  map<fixed64, fixed64> fixed64_mapdata = 12;
+  map<sfixed64, sfixed64> sfixed64_mapdata = 13;
+  map<string, double> double_mapdata = 14;
+  map<string, float> float_mapdata = 15;
+  map<bool, bool> bool_mapdata = 16;
+  map<string, bytes> bytes_mapdata = 17;
+}
+
+message BasicEnumMessage {
+  enum BasicEnum {
+    NOTHING = 0;
+    FIRST = 1;
+    SECOND = 2;
+  }
+}
+
+message SimpleMessageEnum {
+  optional string key = 1;
+  optional string value = 2;
+  enum NestedEnum {
+    NESTED_NOTHING = 0;
+    NESTED_FIRST = 1;
+    NESTED_SECOND = 2;
+  }
+  optional BasicEnumMessage.BasicEnum basic_enum = 3;
+  optional NestedEnum nested_enum = 4;
+}
+
+
+message OtherExample {
+  optional string other = 1;
+}
+
+message IncludedExample {
+  optional string included = 1;
+  optional OtherExample other = 2;
+}
+
+message MultipleExample {
+  optional IncludedExample included_example = 1;
+}
+
+message recursiveA {
+  optional string keyA = 1;
+  optional recursiveB messageB = 2;
+}
+
+message recursiveB {
+  optional string keyB = 1;
+  optional recursiveA messageA = 2;
+}
+
+message recursiveC {
+  optional string keyC = 1;
+  optional recursiveD messageD = 2;
+}
+
+message recursiveD {
+  optional string keyD = 1;
+  repeated recursiveC messageC = 2;
+}
+
+message requiredMsg {
+  optional string key = 1;
+  optional int32 col_1 = 2;
+  optional string col_2 = 3;
+  optional int32 col_3 = 4;
+}
+
+// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/timestamp.proto
+message Timestamp {
+  optional int64 seconds = 1;
+  optional int32 nanos = 2;
+}
+
+message timeStampMsg {
+  optional string key = 1;
+  optional Timestamp stmp = 2;
+}
+// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/duration.proto
+message Duration {
+  optional int64 seconds = 1;
+  optional int32 nanos = 2;
+}
+
+message durationMsg {
+  optional string key = 1;
+  optional Duration duration = 2;
+}
+
+message ProtoWithDefaults {
+  optional string user_name = 1;
+  required int32 id = 2;
+  optional int32 api_quota = 3 [default = 100]; // Default 100 qps.
+  optional string location = 4 [default = "Unknown"];
+}

Review Comment:
   Can we also add Enum ? I tested with a default value which seemed to work. But then the following text in the spec: 
   
   "For enums, the default value is the first value listed in the enum's type definition."
   
   If I interpret this to mean that if we don't specify a default value and the field is left uninitialized, it should pick the. first default value. That interpretation does not work because hasDefaultValue fails and hence the test would fail if you assert that it has the first value.



##########
connector/protobuf/pom.xml:
##########
@@ -110,6 +109,60 @@
           </relocations>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>com.github.os72</groupId>
+        <artifactId>protoc-jar-maven-plugin</artifactId>
+        <version>3.11.4</version>
+        <!-- Generates Java classes and descriptor files for the tests -->
+        <executions>
+          <execution>
+            <phase>generate-test-sources</phase>
+            <id>v2-protos</id>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <inputDirectories>src/test/protobuf/v2</inputDirectories>
+              <outputTargets>
+                <outputTarget>
+                  <type>java</type>
+                  <addSources>test</addSources>
+                </outputTarget>
+                <outputTarget>
+                  <type>descriptor</type>
+                  <addSources>test</addSources>
+                  <outputDirectorySuffix>descriptor-set-v2</outputDirectorySuffix>
+                </outputTarget>
+              </outputTargets>
+            </configuration>
+          </execution>
+          <execution>
+            <phase>generate-test-sources</phase>
+            <id>v3-protos</id>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <inputDirectories>src/test/protobuf/v3</inputDirectories>
+              <outputTargets>
+                <outputTarget>
+                  <type>java</type>
+                  <addSources>test</addSources>
+                </outputTarget>
+                <outputTarget>
+                  <type>descriptor</type>
+                  <addSources>test</addSources>
+                  <outputDirectorySuffix>descriptor-set-v3</outputDirectorySuffix>
+                </outputTarget>
+              </outputTargets>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>

Review Comment:
   It looks like we still have the desc files under resources/protobuf ?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -56,49 +103,50 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
         lit(1202.00).cast(org.apache.spark.sql.types.FloatType).as("float_value"),
         lit(true).as("bool_value"),
         lit("0".getBytes).as("bytes_value")).as("SimpleMessage"))
-    val protoStructDF = df.select(
-      functions.to_protobuf($"SimpleMessage", testFileDesc, "SimpleMessage").as("proto"))
-    val actualDf = protoStructDF.select(
-      functions.from_protobuf($"proto", testFileDesc, "SimpleMessage").as("proto.*"))
-    checkAnswer(actualDf, df)
+
+    checkWithFileAndClassName("SimpleMessage") {
+      case (name, descFilePathOpt) =>
+        val protoStructDF = df.select(
+          to_protobuf_wrapper($"SimpleMessage", name, descFilePathOpt).as("proto"))
+        val actualDf = protoStructDF.select(
+          from_protobuf_wrapper($"proto", name, descFilePathOpt).as("proto.*"))
+        checkAnswer(actualDf, df)
+    }
   }
 
   test("roundtrip in from_protobuf and to_protobuf - Repeated") {
-    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "SimpleMessageRepeated")
 
-    val dynamicMessage = DynamicMessage
-      .newBuilder(descriptor)
-      .setField(descriptor.findFieldByName("key"), "key")
-      .setField(descriptor.findFieldByName("value"), "value")
-      .addRepeatedField(descriptor.findFieldByName("rbool_value"), false)
-      .addRepeatedField(descriptor.findFieldByName("rbool_value"), true)
-      .addRepeatedField(descriptor.findFieldByName("rdouble_value"), 1092092.654d)
-      .addRepeatedField(descriptor.findFieldByName("rdouble_value"), 1092093.654d)
-      .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10903.0f)
-      .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10902.0f)
-      .addRepeatedField(
-        descriptor.findFieldByName("rnested_enum"),
-        descriptor.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
-      .addRepeatedField(
-        descriptor.findFieldByName("rnested_enum"),
-        descriptor.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
+    val protoMessage = SimpleMessageRepeated
+      .newBuilder()
+      .setKey("key")
+      .setValue("value")
+      .addRboolValue(false)
+      .addRboolValue(true)
+      .addRdoubleValue(1092092.654d)
+      .addRdoubleValue(1092093.654d)
+      .addRfloatValue(10903.0f)
+      .addRfloatValue(10902.0f)
+      .addRnestedEnum(NestedEnum.ESTED_NOTHING)
+      .addRnestedEnum(NestedEnum.NESTED_FIRST)
       .build()
 
-    val df = Seq(dynamicMessage.toByteArray).toDF("value")
-    val fromProtoDF = df.select(
-      functions.from_protobuf($"value", testFileDesc, "SimpleMessageRepeated").as("value_from"))
-    val toProtoDF = fromProtoDF.select(
-      functions.to_protobuf($"value_from", testFileDesc, "SimpleMessageRepeated").as("value_to"))
-    val toFromProtoDF = toProtoDF.select(
-      functions
-        .from_protobuf($"value_to", testFileDesc, "SimpleMessageRepeated")
-        .as("value_to_from"))
-    checkAnswer(fromProtoDF.select($"value_from.*"), toFromProtoDF.select($"value_to_from.*"))
+    val df = Seq(protoMessage.toByteArray).toDF("value")
+
+    checkWithFileAndClassName("SimpleMessageRepeated") {
+      case (name, descFilePathOpt) =>
+        val fromProtoDF = df.select(
+          from_protobuf_wrapper($"value", name, descFilePathOpt).as("value_from"))
+      val toProtoDF = fromProtoDF.select(
+        to_protobuf_wrapper($"value_from", name, descFilePathOpt).as("value_to"))
+      val toFromProtoDF = toProtoDF.select(
+        from_protobuf_wrapper($"value_to", name, descFilePathOpt).as("value_to_from"))
+      checkAnswer(fromProtoDF.select($"value_from.*"), toFromProtoDF.select($"value_to_from.*"))
+    }
   }
 
   test("roundtrip in from_protobuf and to_protobuf - Repeated Message Once") {
-    val repeatedMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "RepeatedMessage")
-    val basicMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "BasicMessage")
+    val repeatedMessageDesc = ProtobufUtils.buildDescriptor(descPathV3, "RepeatedMessage")
+    val basicMessageDesc = ProtobufUtils.buildDescriptor(descPathV3, "BasicMessage")

Review Comment:
   I guess. you are picking the descriptors (v2 or v3) for building the message randomly here. Is that correct ?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -56,49 +103,50 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
         lit(1202.00).cast(org.apache.spark.sql.types.FloatType).as("float_value"),
         lit(true).as("bool_value"),
         lit("0".getBytes).as("bytes_value")).as("SimpleMessage"))
-    val protoStructDF = df.select(
-      functions.to_protobuf($"SimpleMessage", testFileDesc, "SimpleMessage").as("proto"))
-    val actualDf = protoStructDF.select(
-      functions.from_protobuf($"proto", testFileDesc, "SimpleMessage").as("proto.*"))
-    checkAnswer(actualDf, df)
+
+    checkWithFileAndClassName("SimpleMessage") {
+      case (name, descFilePathOpt) =>
+        val protoStructDF = df.select(
+          to_protobuf_wrapper($"SimpleMessage", name, descFilePathOpt).as("proto"))
+        val actualDf = protoStructDF.select(
+          from_protobuf_wrapper($"proto", name, descFilePathOpt).as("proto.*"))
+        checkAnswer(actualDf, df)
+    }
   }
 
   test("roundtrip in from_protobuf and to_protobuf - Repeated") {
-    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "SimpleMessageRepeated")
 
-    val dynamicMessage = DynamicMessage
-      .newBuilder(descriptor)
-      .setField(descriptor.findFieldByName("key"), "key")
-      .setField(descriptor.findFieldByName("value"), "value")
-      .addRepeatedField(descriptor.findFieldByName("rbool_value"), false)
-      .addRepeatedField(descriptor.findFieldByName("rbool_value"), true)
-      .addRepeatedField(descriptor.findFieldByName("rdouble_value"), 1092092.654d)
-      .addRepeatedField(descriptor.findFieldByName("rdouble_value"), 1092093.654d)
-      .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10903.0f)
-      .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10902.0f)
-      .addRepeatedField(
-        descriptor.findFieldByName("rnested_enum"),
-        descriptor.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
-      .addRepeatedField(
-        descriptor.findFieldByName("rnested_enum"),
-        descriptor.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
+    val protoMessage = SimpleMessageRepeated
+      .newBuilder()
+      .setKey("key")
+      .setValue("value")
+      .addRboolValue(false)
+      .addRboolValue(true)
+      .addRdoubleValue(1092092.654d)
+      .addRdoubleValue(1092093.654d)
+      .addRfloatValue(10903.0f)
+      .addRfloatValue(10902.0f)
+      .addRnestedEnum(NestedEnum.ESTED_NOTHING)
+      .addRnestedEnum(NestedEnum.NESTED_FIRST)
       .build()
 

Review Comment:
   Here SimpleMessageRepeated is a v3 class which is used to build the protobuf message. It might make sense to add one test case to use v2.SimpleMessageRepeated (or anything else) to build the message and do the test. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -55,10 +55,14 @@ private[protobuf] case class ProtobufDataToCatalyst(
   private lazy val protobufOptions = ProtobufOptions(options)
 
   @transient private lazy val messageDescriptor =
-    ProtobufUtils.buildDescriptor(descFilePath, messageName)
+    ProtobufUtils.buildDescriptor(messageName, descFilePath)
+    // TODO: Avoid carrying the file name. Read the contents of descriptor file only once
+    //       at the start. Rest of the runs should reuse the buffer. Otherwise, it could
+    //       cause inconsistencies if the file contents are changed the user after a few days.
+    //       Same for the write side in [[CatalystDataToProtobuf]].

Review Comment:
   I am not sure I understand the comment completely. When buildDescriptor is called, the contents of the file is read and a descriptor is created. The first time messageDescriptor is evaluated, the file contents are read. Could you clarify ?



##########
connector/protobuf/src/test/protobuf/v3/functions_suite.proto:
##########
@@ -119,7 +116,7 @@ message SimpleMessageEnum {
   string key = 1;
   string value = 2;
   enum NestedEnum {
-    ESTED_NOTHING = 0;
+    ESTED_NOTHING = 0; // TODO: Fix the name.

Review Comment:
   Should we fix this in v2 and v3 ? Seems like a small change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org