You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/05 21:51:27 UTC

[GitHub] [spark] SandishKumarHN opened a new pull request, #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

SandishKumarHN opened a new pull request, #38922:
URL: https://github.com/apache/spark/pull/38922

   Oneof fields allow a message to contain one and only one of a defined set of field types, while recursive fields provide a way to define messages that can refer to themselves, allowing for the creation of complex and nested data structures.  with this change users will be able to use protobuf OneOf fields with spark-protobuf, making it a more complete and useful tool for processing protobuf data.
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   - Add support for protobuf oneof field
   - Stop recursion at the first level when a recursive field is encountered. (instead of throwing an error) 
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Stop recursion at the first level and handle nulltype in deserilization. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   NA
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Added Unit tests for OneOf field support and recursion checks.
   Tested full support for nested OneOf fields and message types using real data from Kafka on a real cluster
   
   cc: @rangadi @mposdev21 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1051328356


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,14 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the `recursive.fields.max.depth` to 0 allows the field to be recurse once,

Review Comment:
   @rangadi Thank you for your suggestion. I have implemented it by adding a comment and a unit test to make the example more clear to users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043846427


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > in the case of field_name recursive check it is A.B.C no recursion.
   
   That example is clearly recursion. What is 'C' here. 
   
   > but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE
   
   Why is this recursion? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1343559904

   cc. @cloud-fan I guess there has been some demands on recursive schema already. Could you please help looking into the proposal and see whether it makes sense to you? Please add more ppl in the loop if you know others who would be interested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340225428

   > The source dataframe struct field should match the protobuf recursion message for "to protobuf." It will convert until the recursion level is matched. like struct within a struct to recursive message. This is true even for existing code.
   
   Interesting. So we would make that null after some depth. Could you add test for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340232193

   >  file that corresponds to the source dataframe.
   
   They might have used from_protobuf() to get that schema, which supports recursive fields. They should be able to do to_protobuf() with the same protobuf definition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340887973

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand. 
   If the redundat data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). Or drop them in Spark sql. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1343588788

   I guess the demand on supporting recursive schema is not specific to protobuf, it also applies to Avro. If we construct a way how to project the recursive schema into Spark SQL's schema, we may want to apply this consistently across components.
   
   The visibility of this PR is too limited, ppl interested in protobuf would only look into this. Instead of deciding such thing within this PR, it seems like going through discussion thread in dev@ for this is not a bad idea. What do you all think?
   
   If you have a proposal, please write it down to the doc format e.g. google doc, with several examples. The description of the PR does not seem to be enough to understand what this PR (or some other) is proposing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043842836


##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage}
 import org.apache.spark.sql.{Column, QueryTest, Row}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions.{lit, struct}
-import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
+import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated}

Review Comment:
   Could we move that to different tests? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043846985


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   Are our unit tests showing these cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand how that is related to recursion. 
   If the redundant data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). Or just drop them in Spark sql. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053683491


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,27 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {
     SchemaType(
-      StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray),
+      StructType(descriptor.getFields.asScala.flatMap(
+        structFieldFor(_,
+          Map(descriptor.getFullName -> 1),
+          protobufOptions: ProtobufOptions)).toArray),
       nullable = true)
   }
 
   def structFieldFor(
       fd: FieldDescriptor,
-      existingRecordNames: Set[String]): Option[StructField] = {
+      existingRecordNames: Map[String, Int],

Review Comment:
   +1



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,17 +108,35 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // If the `recursive.fields.max.depth` value is not specified, it will default to -1;
+        // recursive fields are not permitted. Setting it to 0 drops all recursive fields,
+        // 1 allows it to be recursed once, and 2 allows it to be recursed twice and so on.
+        // A value greater than 10 is not allowed, and if a protobuf record has more depth for
+        // recursive fields than the allowed value, it will be truncated and some fields may be
+        // discarded.
+        // SQL Schema for the protobuf message `message Person { string name = 1; Person bff = 2}`
+        // will vary based on the value of "recursive.fields.max.depth".
+        // 0: struct<name: string, bff: null>
+        // 1: struct<name string, bff: <name: string, bff: null>>
+        // 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
+        val recordName = fd.getMessageType.getFullName

Review Comment:
   Good catch. I think the previous code was incorrect. We need to verify if a same Protobuf type was seen before in this DFS traversal. 
   @SandishKumarHN what was the unit test that verified recursion?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,27 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {

Review Comment:
   Yeah, I just noticed. Not sure if if we need. 
   @SandishKumarHN could we remove this in a follow up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mposdev21 commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
mposdev21 commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1048002705


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -235,7 +237,7 @@ private[sql] class ProtobufDeserializer(
           writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage])
           updater.set(ordinal, row)
 
-      case (MESSAGE, ArrayType(st: StructType, containsNull)) =>
+      case (MESSAGE, ArrayType(st: DataType, containsNull)) =>

Review Comment:
   It's not clear as to why we need to make this change. It does not look like OneOf related. Could you clarify why this would be needed ? Does it cover any specific case ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340188322

   https://github.com/apache/spark/pull/38922#discussion_r1041470191
   
   @rangadi made the below changes.
   - Added selectable recursion depth option to from_protobuf.
   - Added two unit tests for Oneof type, a simple one for Oneof field, and a complex Onefield with recursionDepth=2.
   - Existing unit tests should cover `foundRecursionInProtobufSchema` if recursionDepth is not set and a recursion field is discovered.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043856365


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   I would have @baganokodo2022 give more details on the field type case. 
   
   We have not yet added unit tests for the field-type case. would like to discuss this before adding unit tests. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage}
 import org.apache.spark.sql.{Column, QueryTest, Row}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions.{lit, struct}
-import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
+import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated}

Review Comment:
   @rangadi I didn't understand. these are already two different tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044857157


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi @baganokodo2022 thanks for the quick meet. meeting conclusion use descriptor type name and added unit tests with some complex schema. 
   ```
   val recordName = fd.getMessageType.getFullName
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1041467502


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,9 +92,13 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
+        // Stop recursion at the first level when a recursive field is encountered.
+        // TODO: The user should be given the option to set the recursion level to 1, 2, or 3

Review Comment:
   Yeah, I think it is useful. Users may not be able to remove recursive references, but might be willing to limit recursion.
   I think the default should be an error with a clear message about how users can set configuration. 
   Also, I don't think it should be spark config, but rather an `option` passed in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1041469181


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -157,6 +157,8 @@ private[sql] class ProtobufDeserializer(
 
       case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
 
+      case (MESSAGE, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)

Review Comment:
   What is this for? For handling limited recursion?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +721,45 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Unit tests for OneOf field support and recursion checks") {

Review Comment:
   Lets separate these two into separate tests with separate protobuf message. 



##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {

Review Comment:
   Are you testing more OneOf and recusion in the same message? Could you split them into separate messages?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,9 +92,13 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
+        // Stop recursion at the first level when a recursive field is encountered.
+        // TODO: The user should be given the option to set the recursion level to 1, 2, or 3

Review Comment:
   Are you planning to add selectable recursion depth here or in a follow up?



##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {
+  string key = 1;
+  oneof payload {

Review Comment:
   How do one-of fields look like in spark schema? Could you give an example? I could not see the schema in the unit tests.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1343698238

   @SandishKumarHN cold you keep the discussion in the code [review thread](https://github.com/apache/spark/pull/38922#discussion_r1043725083)? It is hard to piece together multiple messages. I think the it is fairly straight forward what recursion means. There seems to be some confusion about that. Let's discuss in the thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044001778


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   ```
   message A {
   B b = 1;
   }
   message B {
   A aa = 1;
   D d = 2;
   }
   
   message D {
   A aaa = 1;
   E a =2;
   }
   
   message E {
   int32 key = 1;
   }
   ```
   @rangadi just correcting the second example. in the case of field_name base recursive check. we would fail to detect the recursion for above because the thread would be ```A.B.A.aa.D.d.A.aaa.E``` set would have unique values [A, B.b, A.aa, A.aaa, E.a], but in the case of @baganokodo2022 proposal it would be ```A.B.A.D.A.E``` would takeout field_name so early detection for recursion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1346009285

   > Can we give an example mapping between recursive proto type and spark data type? There is no native recursion support in Spark's type system, so this is a bit counterintuitive.
   
   @cloud-fan sure, Since spark does not support circular references, so we are providing an option called circularReferenceDepth that allows users to specify at what point the recursion should stop. By default, this option will be disabled and an error will be thrown if a circular reference is detected.
   
   ```
   message A {
   B b = 1;
   C c = 2
   int32 key = 3;
   }
   
   message B {
   A a = 1;
   int32 key = 2;
   }
   
   message C {
   A a = 1;
   int32 key = 2; 
   }
   ```
   
   the above protobuf schema will be converted to a Spark struct where the struct contains a field of type struct within a struct. This allows the schema to support nested data structures.
   
   default value -1: will throw an error if we find any recursive fields,
   
   level 0 :
   ```
   A = Struct(b = Struct(a = Struct(), key = 123), c = Struct(a = Struct(), key = 123), key = 123)
   ```
   level 1:
   ```
   A = Struct(b = Struct(a = Struct(b = Struct(), c = Struct(), key = 123), key = 123), c = Struct(a = Struct(b = Struct(), c = Struct(), key = 123), key = 123), key = 123)
   ```
   level 2: 
   ```
   A = Struct(b = Struct(a = Struct(b = Struct(a = Struct(), key = 123), c = Struct(a = Struct(), key = 123), key = 123), key = 123), c = Struct(a = Struct(b = Struct(a = Struct(), key = 123), c = Struct(a = Struct(), key = 123), key = 123), key = 123), key = 123)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340212474

   > Added selectable recursion depth option to from_protobuf.
   
   Do we need to this for 'to_protobuf()' too? What would happen in that case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340234549

   > >  file that corresponds to the source dataframe.
   > 
   > 
   > 
   > They might have used from_protobuf() to get that schema, which supports recursive fields. They should be able to do to_protobuf() with the same protobuf definition.
   
   This case is already been covered in the unit tests. will add a unit test for direct struct to protobuf conversion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1343708786

   OK the recursion support isn't something the PR originally proposes. Got it. Then as long as it fits with what we support with Avro, it looks OK to me. cc. @gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044064987


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > we would fail to detect the recursion for above because the thread would be
   
   Why would we fail? Lets say this is doing `from_protobuf(col, 'message_A')`
   `A aa = 1` at line 5 would be treated as recursion.
   Why are field names relevant at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1051292604


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,14 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the `recursive.fields.max.depth` to 0 allows the field to be recurse once,

Review Comment:
   '0' disables recursion right? Why once? This might be difference in terminology. Thats why giving a quick example is better. Could you add this example?:
   
    Consider a simple simple recursive proto 'message Person { string name = 1; Person bff = 2}
   
   > What would be spark schema when recursion 0, 1, and 2? I think : 
   
     - 0: struct<name: string, bff: null>
     - 1: struct<name string, bff: <name: string, bff: null>>
     - 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>>
     
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
URL: https://github.com/apache/spark/pull/38922


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1358763884

   Asking @HeartSaVioR to take a quick look to approve. 
   @cloud-fan take a look at the updated PR description for example of how spark schema would look like with the different setting for the config. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053865743


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,27 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {
     SchemaType(
-      StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray),
+      StructType(descriptor.getFields.asScala.flatMap(
+        structFieldFor(_,
+          Map(descriptor.getFullName -> 1),
+          protobufOptions: ProtobufOptions)).toArray),
       nullable = true)
   }
 
   def structFieldFor(
       fd: FieldDescriptor,
-      existingRecordNames: Set[String]): Option[StructField] = {
+      existingRecordNames: Map[String, Int],

Review Comment:
   @cloud-fan added a comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044079922


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi fd.fullName is able to detect the recursive field with different field names. add a unit test. now I'm confused.  
   ```Fail for recursion field with different field names``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1051292604


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,14 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the `recursive.fields.max.depth` to 0 allows the field to be recurse once,

Review Comment:
   '0' disables recursion right? Why once? This might be difference in terminology. Thats why giving a quick example is better. Could you add this example?:
   
    Consider a simple simple recursive proto 'message Person { string name = 1; Person bff = 2}
   
   > What would be spark schema when recursion 0, 1, and 2? I think : 
   
     - 0: struct<name: string, bff: null>
     - 1: struct<name string, bff: <name: string, bff: null>>
     - 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>
     
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1041487180


##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {
+  string key = 1;
+  oneof payload {

Review Comment:
   @rangadi the "Oneof" field is of message type, Oneof will be converted to a struct type. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +721,45 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Unit tests for OneOf field support and recursion checks") {

Review Comment:
   will do that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1041541422


##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {

Review Comment:
   Combined one is fine, we could keep it. Better to have a simpler separate tests as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340230717

   > > The source dataframe struct field should match the protobuf recursion message for "to protobuf." It will convert until the recursion level is matched. like struct within a struct to recursive message. This is true even for existing code.
   > 
   > Interesting. So we would make that null after some depth. Could you add test for this?
   
   @rangadi will add a test for the above case. 
   A Spark dataframe with complex nested structures should typically be converted to a protobuf message. It is the user's responsibility to specify right .proto(.desc) file that corresponds to the source dataframe. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] baganokodo2022 commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
baganokodo2022 commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1341534552

   Hi @SandishKumarHN,
   
   For the `recursionDepth` option, could we consider naming it as `CircularReferenceTolerance` or `CircularReferenceDepth` for clarity?
   For instance, -1 (default value) will error out on any circular reference, 0 drops any circular reference field, 1 allows the same field to be entered twice, and on.
    
   Besides, can we also support a "CircularReferenceType" option with a enum value of `[FIELD_NAME, FIELD_TYPE]`. The reason is because navigation can go very deep before the same **fully-qualified** `FIELD_NAME` is encountered again. While `FIELD_TYPE` stops recursive navigation much faster. We could make `FIELD_NAME` the default option. In my test cases, with `FIELD_TYPE`, a circular reference can repeat 3 times before the executor hit OOM, while `FIELD_NAME` hit OOM when `CircularReferenceTolerance` is set to 1.
   
   Please let me know your thoughts.
   
   cc @rangadi 
   
   Thank you
   
   Xinyu Liu
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043846427


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > in the case of field_name recursive check it is A.B.C no recursion.
   
   The first example is clearly recursion. What is 'C' here?
   
   > but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE
   
   Why is this recursion? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] baganokodo2022 commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
baganokodo2022 commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043901516


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,26 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {
     SchemaType(
-      StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray),
+      StructType(descriptor.getFields.asScala.flatMap(
+        structFieldFor(_, Map.empty, Map.empty, protobufOptions: ProtobufOptions)).toArray),
       nullable = true)
   }
 
   def structFieldFor(
       fd: FieldDescriptor,
-      existingRecordNames: Set[String]): Option[StructField] = {
+      existingRecordNames: Map[String, Int],
+      existingRecordTypes: Map[String, Int],

Review Comment:
   @SandishKumarHN since it is going to be either `FIELD_NAME` or `FIELD_TYPE`, do we need keep both 2 Maps?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > 
   
   Yes @SandishKumarHN you are right. That is discovered from a very complex Proto schema shared across many micro services.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   @SandishKumarHN and @rangadi , should we error out on `-1` the default value unless users specifically override?
   0 -> drop all recursed fields once encountered
   1 -> allowed the same field name (type) to be entered twice.
   2 -> allowed the same field name (type) to be entered 3 timce.
   
   thoughts?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   In my back-ported branch,
   ```
           val recordName = circularReferenceType match {
             case CircularReferenceTypes.FIELD_NAME =>
               fd.getFullName
             case CircularReferenceTypes.FIELD_TYPE =>
               fd.getFullName().substring(0, fd.getFullName().lastIndexOf(".")) 
           }
           
           if (circularReferenceTolerance < 0 && existingRecordNames(recordName) > 0) {
             // no tolerance on circular reference
             logError(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}")
             throw new IllegalStateException(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}")
           }
   
           if (existingRecordNames(recordName) > (circularReferenceTolerance max 0) ) {
             // stop navigation and drop the repetitive field
             logInfo(s"circular reference in protobuf schema detected [max tolerance breached] field dropped - ${recordName} = ${existingRecordNames(recordName)}")
             Some(NullType)
           } else {
             val newRecordNames: Map[String, Int] = existingRecordNames +  
               (recordName -> (1 + existingRecordNames(recordName)))
             Option(
               fd.getMessageType.getFields.asScala
                 .flatMap(structFieldFor(_, newRecordNames, protobufOptions))
                 .toSeq)
               .filter(_.nonEmpty)
               .map(StructType.apply)
           }```



##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {

Review Comment:
   nice 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   Hi @rangadi , under certain circumstances dropping fields with data seems inevitable when dealing with circular references. We can't tell which fields are intended to be kept. One example is the parent-child relationship in a RDB data model, considering IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> CEO, which are all `Employee` type, assuming the relationship is bi-directional.  The longest path for `level-1` circular reference on `FIELD_NAME` is IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> CEO -> CTO -> VP -> Senior Director -> Director -> EM2 -> EM -> IC. In reality, data scientists may just want to keep 2 levels of circular reference on `FIELD_TYPE` , IC -> EM -> EM2, or EM2 -> Director -> Senior Director. This greatly reduces redundant data in the warehouse.
   
   Hope it make sense
   
   Thanks
   Xinyu



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] baganokodo2022 commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
baganokodo2022 commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043904214


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   @SandishKumarHN and @rangadi , should we error out on `-1` the default value unless users specifically override?
   0 (tolerance) -> drop all recursed fields once encountered
   1 (tolerance) -> allowed the same field name (type) to be entered twice.
   2 (tolerance) -> allowed the same field name (type) to be entered 3 timce.
   
   thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1051293916


##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +693,435 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> from_protobuf") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent")
+    val oneOfEvent = OneOfEvent.newBuilder()
+      .setKey("key")
+      .setCol1(123)
+      .setCol3(109202L)
+      .setCol2("col2value")
+      .addCol4("col4value").build()
+
+    val df = Seq(oneOfEvent.toByteArray).toDF("value")
+
+    checkWithFileAndClassName("OneOfEvent") {
+      case (name, descFilePathOpt) =>
+        val fromProtoDf = df.select(
+          from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
+        val toDf = fromProtoDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+        val toFromDf = toDf.select(
+          from_protobuf_wrapper($"toProto", name, descFilePathOpt) as 'fromToProto)
+        checkAnswer(fromProtoDf, toFromDf)
+        val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+        descriptor.getFields.asScala.map(f => {
+          assert(actualFieldNames.contains(f.getName))
+        })
+
+        val eventFromSpark = OneOfEvent.parseFrom(
+          toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        // OneOf field: the last set value(by order) will overwrite all previous ones.
+        assert(eventFromSpark.getCol2.equals("col2value"))
+        assert(eventFromSpark.getCol3 == 0)
+        val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+        eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+
+        val jsonSchema =
+          s"""
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |""".stripMargin
+        val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
+        val data = Seq(Row(Row("key", 123, "col2value", 109202L, Seq("col4value"))))
+        val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+        val dataDfToProto = dataDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+
+        val eventFromSparkSchema = OneOfEvent.parseFrom(
+          dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        assert(eventFromSparkSchema.getCol2.isEmpty)
+        assert(eventFromSparkSchema.getCol3 == 109202L)
+        eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+    }
+  }
+
+  test("Fail for recursion field with different field names without circularReferenceDepth") {
+    val aEventWithRecursion = EventWithRecursion.newBuilder().setKey(2).build()
+    val aaEventWithRecursion = EventWithRecursion.newBuilder().setKey(3).build()
+    val aaaEventWithRecursion = EventWithRecursion.newBuilder().setKey(4).build()
+    val c = messageC.newBuilder().setAaa(aaaEventWithRecursion).setKey(12092)
+    val b = messageB.newBuilder().setAa(aaEventWithRecursion).setC(c)
+    val a = messageA.newBuilder().setA(aEventWithRecursion).setB(b).build()
+    val eventWithRecursion = EventWithRecursion.newBuilder().setKey(1).setA(a).build()
+
+    val df = Seq(eventWithRecursion.toByteArray).toDF("protoEvent")
+
+    checkWithFileAndClassName("EventWithRecursion") {
+      case (name, descFilePathOpt) =>
+        val e = intercept[AnalysisException] {
+          df.select(
+            from_protobuf_wrapper($"protoEvent", name, descFilePathOpt).as("messageFromProto"))
+            .show()
+        }
+        assert(e.getMessage.contains(
+          "Found recursive reference in Protobuf schema, which can not be processed by Spark"
+        ))
+    }
+  }
+
+  test("recursion field with different field names with circularReferenceDepth") {

Review Comment:
   Fix the name.



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -1016,7 +1016,7 @@
   },
   "RECURSIVE_PROTOBUF_SCHEMA" : {
     "message" : [
-      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` as 0 or 1 or 2. Going beyond 3 levels of recursion is not allowed."

Review Comment:
   Why is 3 or above not alloweded? Seems pretty low. If a customer wants to set the level, they will be conscious. I think it should be at least high single digits to cover most cases. How about 10? 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +693,435 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> from_protobuf") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent")
+    val oneOfEvent = OneOfEvent.newBuilder()
+      .setKey("key")
+      .setCol1(123)
+      .setCol3(109202L)
+      .setCol2("col2value")
+      .addCol4("col4value").build()
+
+    val df = Seq(oneOfEvent.toByteArray).toDF("value")
+
+    checkWithFileAndClassName("OneOfEvent") {
+      case (name, descFilePathOpt) =>
+        val fromProtoDf = df.select(
+          from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
+        val toDf = fromProtoDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+        val toFromDf = toDf.select(
+          from_protobuf_wrapper($"toProto", name, descFilePathOpt) as 'fromToProto)
+        checkAnswer(fromProtoDf, toFromDf)
+        val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+        descriptor.getFields.asScala.map(f => {
+          assert(actualFieldNames.contains(f.getName))
+        })
+
+        val eventFromSpark = OneOfEvent.parseFrom(
+          toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        // OneOf field: the last set value(by order) will overwrite all previous ones.
+        assert(eventFromSpark.getCol2.equals("col2value"))
+        assert(eventFromSpark.getCol3 == 0)
+        val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+        eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+
+        val jsonSchema =
+          s"""
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |""".stripMargin
+        val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
+        val data = Seq(Row(Row("key", 123, "col2value", 109202L, Seq("col4value"))))
+        val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+        val dataDfToProto = dataDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+
+        val eventFromSparkSchema = OneOfEvent.parseFrom(
+          dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        assert(eventFromSparkSchema.getCol2.isEmpty)
+        assert(eventFromSparkSchema.getCol3 == 109202L)
+        eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+    }
+  }
+
+  test("Fail for recursion field with different field names without circularReferenceDepth") {

Review Comment:
   Fix `circularReferenceDepth` in the name.



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +693,435 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> from_protobuf") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent")
+    val oneOfEvent = OneOfEvent.newBuilder()
+      .setKey("key")
+      .setCol1(123)
+      .setCol3(109202L)
+      .setCol2("col2value")
+      .addCol4("col4value").build()
+
+    val df = Seq(oneOfEvent.toByteArray).toDF("value")
+
+    checkWithFileAndClassName("OneOfEvent") {
+      case (name, descFilePathOpt) =>
+        val fromProtoDf = df.select(
+          from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
+        val toDf = fromProtoDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+        val toFromDf = toDf.select(
+          from_protobuf_wrapper($"toProto", name, descFilePathOpt) as 'fromToProto)
+        checkAnswer(fromProtoDf, toFromDf)
+        val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+        descriptor.getFields.asScala.map(f => {
+          assert(actualFieldNames.contains(f.getName))
+        })
+
+        val eventFromSpark = OneOfEvent.parseFrom(
+          toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        // OneOf field: the last set value(by order) will overwrite all previous ones.
+        assert(eventFromSpark.getCol2.equals("col2value"))
+        assert(eventFromSpark.getCol3 == 0)
+        val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+        eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+
+        val jsonSchema =
+          s"""
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |""".stripMargin
+        val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
+        val data = Seq(Row(Row("key", 123, "col2value", 109202L, Seq("col4value"))))
+        val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+        val dataDfToProto = dataDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+
+        val eventFromSparkSchema = OneOfEvent.parseFrom(
+          dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        assert(eventFromSparkSchema.getCol2.isEmpty)
+        assert(eventFromSparkSchema.getCol3 == 109202L)
+        eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+    }
+  }
+
+  test("Fail for recursion field with different field names without circularReferenceDepth") {
+    val aEventWithRecursion = EventWithRecursion.newBuilder().setKey(2).build()
+    val aaEventWithRecursion = EventWithRecursion.newBuilder().setKey(3).build()
+    val aaaEventWithRecursion = EventWithRecursion.newBuilder().setKey(4).build()
+    val c = messageC.newBuilder().setAaa(aaaEventWithRecursion).setKey(12092)
+    val b = messageB.newBuilder().setAa(aaEventWithRecursion).setC(c)
+    val a = messageA.newBuilder().setA(aEventWithRecursion).setB(b).build()
+    val eventWithRecursion = EventWithRecursion.newBuilder().setKey(1).setA(a).build()
+
+    val df = Seq(eventWithRecursion.toByteArray).toDF("protoEvent")
+
+    checkWithFileAndClassName("EventWithRecursion") {
+      case (name, descFilePathOpt) =>
+        val e = intercept[AnalysisException] {
+          df.select(
+            from_protobuf_wrapper($"protoEvent", name, descFilePathOpt).as("messageFromProto"))
+            .show()
+        }
+        assert(e.getMessage.contains(
+          "Found recursive reference in Protobuf schema, which can not be processed by Spark"
+        ))
+    }
+  }
+
+  test("recursion field with different field names with circularReferenceDepth") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "Employee")
+
+    val manager = Employee.newBuilder().setFirstName("firstName").setLastName("lastName").build()
+    val em2 = EM2.newBuilder().setTeamsize(100).setEm2Manager(manager).build()
+    val em = EM.newBuilder().setTeamsize(100).setEmManager(manager).build()
+    val ic = IC.newBuilder().addSkills("java").setIcManager(manager).build()
+    val employee = Employee.newBuilder().setFirstName("firstName")
+      .setLastName("lastName").setEm2(em2).setEm(em).setIc(ic).build()
+
+    val df = Seq(employee.toByteArray).toDF("protoEvent")
+    val options = new java.util.HashMap[String, String]()
+    options.put("recursive.fields.max.depth", "1")
+
+    val fromProtoDf = df.select(
+      functions.from_protobuf($"protoEvent", "Employee", testFileDesc, options) as 'sample)
+
+    val toDf = fromProtoDf.select(
+      functions.to_protobuf($"sample", "Employee", testFileDesc) as 'toProto)
+    val toFromDf = toDf.select(
+      functions.from_protobuf($"toProto",
+        "Employee",
+        testFileDesc,
+        options) as 'fromToProto)
+
+    checkAnswer(fromProtoDf, toFromDf)
+
+    val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+    descriptor.getFields.asScala.map(f => {
+      assert(actualFieldNames.contains(f.getName))
+    })
+
+    val eventFromSpark = Employee.parseFrom(
+      toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+
+    assert(eventFromSpark.getIc.getIcManager.getFirstName.equals("firstName"))
+    assert(eventFromSpark.getIc.getIcManager.getLastName.equals("lastName"))
+    assert(eventFromSpark.getEm2.getEm2Manager.getFirstName.isEmpty)
+  }
+
+  test("Verify OneOf field with recursive fields between from_protobuf -> to_protobuf " +
+    "and struct -> from_protobuf") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEventWithRecursion")
+
+    val recursiveANested = EventRecursiveA.newBuilder()
+      .setKey("keyNested3").build()
+    val oneOfEventNested = OneOfEventWithRecursion.newBuilder()
+      .setKey("keyNested2")
+      .setValue("valueNested2")
+      .setRecursiveA(recursiveANested).build()
+    val recursiveA = EventRecursiveA.newBuilder().setKey("recursiveAKey")
+      .setRecursiveA(oneOfEventNested).build()
+    val recursiveB = EventRecursiveB.newBuilder()
+      .setKey("recursiveBKey")
+      .setValue("recursiveBvalue").build()
+    val oneOfEventWithRecursion = OneOfEventWithRecursion.newBuilder()
+      .setKey("key1")
+      .setValue("value1")
+      .setRecursiveB(recursiveB)
+      .setRecursiveA(recursiveA).build()
+
+    val df = Seq(oneOfEventWithRecursion.toByteArray).toDF("value")
+
+    val options = new java.util.HashMap[String, String]()
+    options.put("recursive.fields.max.depth", "0")
+
+    val fromProtoDf = df.select(
+      functions.from_protobuf($"value",
+        "OneOfEventWithRecursion",
+        testFileDesc, options) as 'sample)
+    val toDf = fromProtoDf.select(
+      functions.to_protobuf($"sample", "OneOfEventWithRecursion", testFileDesc) as 'toProto)
+    val toFromDf = toDf.select(
+      functions.from_protobuf($"toProto",
+        "OneOfEventWithRecursion",
+        testFileDesc,
+        options) as 'fromToProto)
+
+    checkAnswer(fromProtoDf, toFromDf)
+
+    val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+    descriptor.getFields.asScala.map(f => {
+      assert(actualFieldNames.contains(f.getName))
+    })
+
+    val eventFromSpark = OneOfEventWithRecursion.parseFrom(
+      toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getKey.equals("keyNested2"))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getValue.equals("valueNested2"))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getRecursiveA.getKey.isEmpty)
+
+    val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+    eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+      assert(expectedFields.contains(f.getName))
+    })
+
+    val jsonSchema =
+      s"""
+         |{
+         |  "type" : "struct",
+         |  "fields" : [ {
+         |    "name" : "sample",
+         |    "type" : {
+         |      "type" : "struct",
+         |      "fields" : [ {
+         |        "name" : "key",
+         |        "type" : "string",
+         |        "nullable" : true
+         |      }, {
+         |        "name" : "recursiveA",
+         |        "type" : {
+         |          "type" : "struct",
+         |          "fields" : [ {
+         |            "name" : "recursiveA",
+         |            "type" : {
+         |              "type" : "struct",
+         |              "fields" : [ {
+         |                "name" : "key",
+         |                "type" : "string",
+         |                "nullable" : true
+         |              }, {
+         |                "name" : "recursiveA",
+         |                "type" : "void",
+         |                "nullable" : true
+         |              }, {
+         |                "name" : "recursiveB",
+         |                "type" : {
+         |                  "type" : "struct",
+         |                  "fields" : [ {
+         |                    "name" : "key",
+         |                    "type" : "string",
+         |                    "nullable" : true
+         |                  }, {
+         |                    "name" : "value",
+         |                    "type" : "string",
+         |                    "nullable" : true
+         |                  }, {
+         |                    "name" : "recursiveA",
+         |                    "type" : {
+         |                      "type" : "struct",
+         |                      "fields" : [ {
+         |                        "name" : "key",
+         |                        "type" : "string",
+         |                        "nullable" : true
+         |                      }, {
+         |                        "name" : "recursiveA",
+         |                        "type" : "void",
+         |                        "nullable" : true
+         |                      }, {
+         |                        "name" : "recursiveB",
+         |                        "type" : "void",
+         |                        "nullable" : true
+         |                      }, {
+         |                        "name" : "value",
+         |                        "type" : "string",
+         |                        "nullable" : true
+         |                      } ]
+         |                    },
+         |                    "nullable" : true
+         |                  } ]
+         |                },
+         |                "nullable" : true
+         |              }, {
+         |                "name" : "value",
+         |                "type" : "string",
+         |                "nullable" : true
+         |              } ]
+         |            },
+         |            "nullable" : true
+         |          }, {
+         |            "name" : "key",
+         |            "type" : "string",
+         |            "nullable" : true
+         |          } ]
+         |        },
+         |        "nullable" : true
+         |      }, {
+         |        "name" : "recursiveB",
+         |        "type" : {
+         |          "type" : "struct",
+         |          "fields" : [ {
+         |            "name" : "key",
+         |            "type" : "string",
+         |            "nullable" : true
+         |          }, {
+         |            "name" : "value",
+         |            "type" : "string",
+         |            "nullable" : true
+         |          }, {
+         |            "name" : "recursiveA",
+         |            "type" : {
+         |              "type" : "struct",
+         |              "fields" : [ {
+         |                "name" : "key",
+         |                "type" : "string",
+         |                "nullable" : true
+         |              }, {
+         |                "name" : "recursiveA",
+         |                "type" : {
+         |                  "type" : "struct",
+         |                  "fields" : [ {
+         |                    "name" : "recursiveA",
+         |                    "type" : {
+         |                      "type" : "struct",
+         |                      "fields" : [ {
+         |                        "name" : "key",
+         |                        "type" : "string",
+         |                        "nullable" : true
+         |                      }, {
+         |                        "name" : "recursiveA",
+         |                        "type" : "void",
+         |                        "nullable" : true
+         |                      }, {
+         |                        "name" : "recursiveB",
+         |                        "type" : "void",
+         |                        "nullable" : true
+         |                      }, {
+         |                        "name" : "value",
+         |                        "type" : "string",
+         |                        "nullable" : true
+         |                      } ]
+         |                    },
+         |                    "nullable" : true
+         |                  }, {
+         |                    "name" : "key",
+         |                    "type" : "string",
+         |                    "nullable" : true
+         |                  } ]
+         |                },
+         |                "nullable" : true
+         |              }, {
+         |                "name" : "recursiveB",
+         |                "type" : "void",
+         |                "nullable" : true
+         |              }, {
+         |                "name" : "value",
+         |                "type" : "string",
+         |                "nullable" : true
+         |              } ]
+         |            },
+         |            "nullable" : true
+         |          } ]
+         |        },
+         |        "nullable" : true
+         |      }, {
+         |        "name" : "value",
+         |        "type" : "string",
+         |        "nullable" : true
+         |      } ]
+         |    },
+         |    "nullable" : true
+         |  } ]
+         |}
+         |""".stripMargin
+
+    val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
+    val data = Seq(
+      Row(
+        Row("key1",
+          Row(
+            Row("keyNested2", null, null, "valueNested2"),
+            "recursiveAKey"),
+          null,
+          "value1")
+      )
+    )
+    val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+    val dataDfToProto = dataDf.select(
+      functions.to_protobuf($"sample", "OneOfEventWithRecursion", testFileDesc) as 'toProto)
+
+    val eventFromSparkSchema = OneOfEventWithRecursion.parseFrom(
+      dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getKey.equals("keyNested2"))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getValue.equals("valueNested2"))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getRecursiveA.getKey.isEmpty)
+    eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
+      assert(expectedFields.contains(f.getName))
+    })
+  }

Review Comment:
   Could you add test that clearly shows the expected schema similar to my comment here: https://github.com/apache/spark/pull/38922/files#r1051292604
   
   It is not easy to seem from these test what schema does 0 or 2 results in. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +693,435 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> from_protobuf") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent")
+    val oneOfEvent = OneOfEvent.newBuilder()
+      .setKey("key")
+      .setCol1(123)
+      .setCol3(109202L)
+      .setCol2("col2value")
+      .addCol4("col4value").build()
+
+    val df = Seq(oneOfEvent.toByteArray).toDF("value")
+
+    checkWithFileAndClassName("OneOfEvent") {
+      case (name, descFilePathOpt) =>
+        val fromProtoDf = df.select(
+          from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
+        val toDf = fromProtoDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+        val toFromDf = toDf.select(
+          from_protobuf_wrapper($"toProto", name, descFilePathOpt) as 'fromToProto)
+        checkAnswer(fromProtoDf, toFromDf)
+        val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+        descriptor.getFields.asScala.map(f => {
+          assert(actualFieldNames.contains(f.getName))
+        })
+
+        val eventFromSpark = OneOfEvent.parseFrom(
+          toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        // OneOf field: the last set value(by order) will overwrite all previous ones.
+        assert(eventFromSpark.getCol2.equals("col2value"))
+        assert(eventFromSpark.getCol3 == 0)
+        val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+        eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+
+        val jsonSchema =
+          s"""
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |{
+             |  "type" : "struct",
+             |  "fields" : [ {
+             |    "name" : "sample",
+             |    "type" : {
+             |      "type" : "struct",
+             |      "fields" : [ {
+             |        "name" : "key",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_1",
+             |        "type" : "integer",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_2",
+             |        "type" : "string",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_3",
+             |        "type" : "long",
+             |        "nullable" : true
+             |      }, {
+             |        "name" : "col_4",
+             |        "type" : {
+             |          "type" : "array",
+             |          "elementType" : "string",
+             |          "containsNull" : false
+             |        },
+             |        "nullable" : false
+             |      } ]
+             |    },
+             |    "nullable" : true
+             |  } ]
+             |}
+             |""".stripMargin
+        val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
+        val data = Seq(Row(Row("key", 123, "col2value", 109202L, Seq("col4value"))))
+        val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+        val dataDfToProto = dataDf.select(
+          to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto)
+
+        val eventFromSparkSchema = OneOfEvent.parseFrom(
+          dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+        assert(eventFromSparkSchema.getCol2.isEmpty)
+        assert(eventFromSparkSchema.getCol3 == 109202L)
+        eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
+          assert(expectedFields.contains(f.getName))
+        })
+    }
+  }
+
+  test("Fail for recursion field with different field names without circularReferenceDepth") {
+    val aEventWithRecursion = EventWithRecursion.newBuilder().setKey(2).build()
+    val aaEventWithRecursion = EventWithRecursion.newBuilder().setKey(3).build()
+    val aaaEventWithRecursion = EventWithRecursion.newBuilder().setKey(4).build()
+    val c = messageC.newBuilder().setAaa(aaaEventWithRecursion).setKey(12092)
+    val b = messageB.newBuilder().setAa(aaEventWithRecursion).setC(c)
+    val a = messageA.newBuilder().setA(aEventWithRecursion).setB(b).build()
+    val eventWithRecursion = EventWithRecursion.newBuilder().setKey(1).setA(a).build()
+
+    val df = Seq(eventWithRecursion.toByteArray).toDF("protoEvent")
+
+    checkWithFileAndClassName("EventWithRecursion") {
+      case (name, descFilePathOpt) =>
+        val e = intercept[AnalysisException] {
+          df.select(
+            from_protobuf_wrapper($"protoEvent", name, descFilePathOpt).as("messageFromProto"))
+            .show()
+        }
+        assert(e.getMessage.contains(
+          "Found recursive reference in Protobuf schema, which can not be processed by Spark"
+        ))
+    }
+  }
+
+  test("recursion field with different field names with circularReferenceDepth") {

Review Comment:
   What is this testing? As we discussed, field name does not matter.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053681591


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,14 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the `recursive.fields.max.depth` to 0 drops all recursive fields,
+  // 1 allows it to be recurse once, and 2 allows it to be recursed twice and so on.
+  // A value of `recursive.fields.max.depth` greater than 10 is not permitted. If it is not
+  // specified, the default value is -1; recursive fields are not permitted. If a protobuf
+  // record has more depth than the allowed value for recursive fields, it will be truncated
+  // and some fields may be discarded.
+  val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt

Review Comment:
   @cloud-fan this is in line with options for Kafka source. e.g. 'kafka.' prefix allows setting Kafka clientconfigs.
   
   In addition, we will be passing more options. E.g. for schema registry auth configs. They will have a prefix like 'confluent.schemaregistry.[actual registry client conf]'
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1360061346

   jenkins merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1340222834

   > > Added selectable recursion depth option to from_protobuf.
   > 
   > Do we need to this for 'to_protobuf()' too? What would happen in that case?
   @rangadi 
   The source dataframe struct field should match the protobuf recursion message for "to protobuf." It will convert until the recursion level is matched. like struct within a struct to recursive message. This is true even for existing code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043726930


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   name or full name? 
   also what keeps track of the recursion depth? 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @SandishKumarHN @baganokodo2022 moving the discussion here (for threading).
   
   > Besides, can we also support a "CircularReferenceType" option with a enum value of [FIELD_NAME, FIELD_TYPE]. The reason is because navigation can go very deep before the same fully-qualified FIELD_NAME is encountered again. While FIELD_TYPE stops recursive navigation much faster.  ...
   
   I didn't quite follow the motivation here. Could you give a concrete examples for the two difference cases?
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.

Review Comment:
   Could you add a justification for this?



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage}
 import org.apache.spark.sql.{Column, QueryTest, Row}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions.{lit, struct}
-import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
+import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated}

Review Comment:
   Are there tests for recursive fields? 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +693,178 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Unit test for Protobuf OneOf field") {

Review Comment:
   Add a short description of the test at the top. It improves readability. What is this verifying? 
   
   Remove "Unit test for", this is already a unit test :). 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -693,4 +693,178 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
       errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
       parameters = Map("descFilePath" -> testFileDescriptor))
   }
+
+  test("Unit test for Protobuf OneOf field") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent")
+    val oneOfEvent = OneOfEvent.newBuilder()
+      .setKey("key")
+      .setCol1(123)
+      .setCol3(109202L)
+      .setCol2("col2value")
+      .addCol4("col4value").build()
+
+    val df = Seq(oneOfEvent.toByteArray).toDF("value")
+
+    val fromProtoDf = df.select(
+      functions.from_protobuf($"value", "OneOfEvent", testFileDesc) as 'sample)
+    val toDf = fromProtoDf.select(
+      functions.to_protobuf($"sample", "OneOfEvent", testFileDesc) as 'toProto)
+    val toFromDf = toDf.select(
+      functions.from_protobuf($"toProto", "OneOfEvent", testFileDesc) as 'fromToProto)
+
+    checkAnswer(fromProtoDf, toFromDf)
+
+    val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+    descriptor.getFields.asScala.map(f => {
+      assert(actualFieldNames.contains(f.getName))
+    })
+
+    val eventFromSpark = OneOfEvent.parseFrom(
+      toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+
+    // OneOf field: the last set value(by order) will overwrite all previous ones.
+    assert(eventFromSpark.getCol2.equals("col2value"))
+    assert(eventFromSpark.getCol3 == 0)
+
+    val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+    eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+      assert(expectedFields.contains(f.getName))
+    })
+
+    val schema = new StructType()
+      .add("sample",
+        new StructType()
+          .add("key", StringType)
+          .add("col_1", IntegerType)
+          .add("col_2", StringType)
+          .add("col_3", LongType)
+          .add("col_4", ArrayType(StringType))
+      )
+
+    val data = Seq(Row(Row("key", 123, "col2value", 109202L, Seq("col4value"))))
+    val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+    val dataDfToProto = dataDf.select(
+      functions.to_protobuf($"sample", "OneOfEvent", testFileDesc) as 'toProto)
+    val eventFromSparkSchema = OneOfEvent.parseFrom(
+      dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+    assert(eventFromSparkSchema.getCol2.isEmpty)
+    assert(eventFromSparkSchema.getCol3 == 109202L)
+    eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
+      assert(expectedFields.contains(f.getName))
+    })
+  }
+
+  test("Unit tests for Protobuf OneOf field with circularReferenceDepth option") {
+    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEventWithRecursion")
+
+    val recursiveANested = EventRecursiveA.newBuilder()
+      .setKey("keyNested3").build()
+    val oneOfEventNested = OneOfEventWithRecursion.newBuilder()
+      .setKey("keyNested2")
+      .setValue("valueNested2")
+      .setRecursiveA(recursiveANested).build()
+    val recursiveA = EventRecursiveA.newBuilder().setKey("recursiveAKey")
+      .setRecursiveA(oneOfEventNested).build()
+    val recursiveB = EventRecursiveB.newBuilder()
+      .setKey("recursiveBKey")
+      .setValue("recursiveBvalue").build()
+    val oneOfEventWithRecursion = OneOfEventWithRecursion.newBuilder()
+      .setKey("key1")
+      .setValue("value1")
+      .setRecursiveB(recursiveB)
+      .setRecursiveA(recursiveA).build()
+
+    val df = Seq(oneOfEventWithRecursion.toByteArray).toDF("value")
+
+    val options = new java.util.HashMap[String, String]()
+    options.put("circularReferenceDepth", "1")
+
+    val fromProtoDf = df.select(
+      functions.from_protobuf($"value",
+        "OneOfEventWithRecursion",
+        testFileDesc, options) as 'sample)
+
+    val toDf = fromProtoDf.select(
+      functions.to_protobuf($"sample", "OneOfEventWithRecursion", testFileDesc) as 'toProto)
+    val toFromDf = toDf.select(
+      functions.from_protobuf($"toProto",
+        "OneOfEventWithRecursion",
+        testFileDesc,
+        options) as 'fromToProto)
+
+    checkAnswer(fromProtoDf, toFromDf)
+
+    val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name)
+    descriptor.getFields.asScala.map(f => {
+      assert(actualFieldNames.contains(f.getName))
+    })
+
+    val eventFromSpark = OneOfEventWithRecursion.parseFrom(
+      toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0))
+
+    // check circularReferenceDepth=1 value are present, but not circularReferenceDepth=2
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getKey.equals("keyNested2"))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getValue.equals("valueNested2"))
+    assert(eventFromSpark.getRecursiveA.getRecursiveA.getRecursiveA.getKey.isEmpty)
+
+    val expectedFields = descriptor.getFields.asScala.map(f => f.getName)
+    eventFromSpark.getDescriptorForType.getFields.asScala.map(f => {
+      assert(expectedFields.contains(f.getName))
+    })
+
+    val schema = StructType(Seq(StructField("sample",

Review Comment:
   Btw, using `val schema = DataType.fromJson("json string") is lot more readable. 
   Optional we could update many of these in follow up PRs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044065444


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   Does the code in master fail to detect this recursion? In that case it would lead to unbounded schema and fail. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044056150


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi just replacing fd.getFullName with below would make sense. we don't need to have two different cases of checks. 
   ```
   val recordName = fd.getFullName.substring(0, fd.getFullName().lastIndexOf("."))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1041487582


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,9 +92,13 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
+        // Stop recursion at the first level when a recursive field is encountered.
+        // TODO: The user should be given the option to set the recursion level to 1, 2, or 3

Review Comment:
   @rangadi planning to add the selectable recursion depth in this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043982706


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   I see only one type of recursion, that is just just based on protobuf message. I.e. 
    message A ends up including itself either directly or indirectly through nesting. This is the 1st one in @SandishKumarHN's message above. The second one does not have recursion. 
   
   Looking for examples of other types.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1048042504


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -235,7 +237,7 @@ private[sql] class ProtobufDeserializer(
           writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage])
           updater.set(ordinal, row)
 
-      case (MESSAGE, ArrayType(st: StructType, containsNull)) =>
+      case (MESSAGE, ArrayType(st: DataType, containsNull)) =>

Review Comment:
   @mposdev21 Actually, we don't need this here at all; we can add it at the top along with other arraytype types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049126074


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+  // specified, it will default to -1, which disables recursive fields.

Review Comment:
   '-1' implies recursive fields are not allowed. 
   ("disables" does not clearly imply that it will be an error")



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+  // specified, it will default to -1, which disables recursive fields.
+  val circularReferenceDepth: Int = parameters.getOrElse("circularReferenceDepth", "-1").toInt

Review Comment:
   Suggestion for renaming this option:  _"recursive.fields.max.depth"_ 
   
   _circularReferenceDepth_ sounds very code variable type. 
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +106,26 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+        // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+        // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+        // specified, it will default to -1, which disables recursive fields.
+        val recordName = fd.getMessageType.getFullName
+        if (existingRecordNames.contains(recordName) &&
+          protobufOptions.circularReferenceDepth < 0 ) {
           throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        } else if (existingRecordNames.contains(recordName) &&
+          existingRecordNames.getOrElse(recordName, 0)
+            > protobufOptions.circularReferenceDepth) {
+          return Some(StructField(fd.getName, NullType, nullable = false))

Review Comment:
   Why is nullable false? 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+  // specified, it will default to -1, which disables recursive fields.

Review Comment:
   Also warn that if the the protobuf record has more depth for recursive fields than allowed here, it will be truncated to the allowed depth. The implies some fields are discarded from the record. 
   
   Could you add a simple example in the comment showing resulting spark schema when this is set to '0' and '2'. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +106,26 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+        // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+        // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+        // specified, it will default to -1, which disables recursive fields.
+        val recordName = fd.getMessageType.getFullName
+        if (existingRecordNames.contains(recordName) &&

Review Comment:
   Better to remove 'return' statement. 
   How about"
   
       val recursiveDepth = existingRecordNames.getOrElse(recordName, 0)
       if (recursiveDepth == 0 ||  // No recursion
         (protobufOptions.circularReferenceDepth >= 0 
           && recursiveDepth <=  (protobufOptions.circularReferenceDepth + 1)) // recursion is within allowed limit.
            val newRecordNames = existingRecordNames + (recordName -> recursiveDepth + 1)) 
             ... 
       } else if (protobufOptions.circularReferenceDepth >= 0) { 
            // Recursion is allowed we reached limit. Truncate.
         return Some(StructField(fd.getName, NullType, ...)      
       } else { // Recursion is not allowed
          throw ...
       }
                
   
   
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -157,6 +157,8 @@ private[sql] class ProtobufDeserializer(
 
       case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
 
+      case (MESSAGE, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)

Review Comment:
   Could you add a comment about we might be dropping data here? It will not be easy to see for a future reader. 
   We could have an option to error our if the actual data has more recursion than the configure. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+  // specified, it will default to -1, which disables recursive fields.
+  val circularReferenceDepth: Int = parameters.getOrElse("circularReferenceDepth", "-1").toInt

Review Comment:
   If we go with that, we could rename the variable as well to 'recursiveFieldMaxDepth' (but this is your choice).



##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,118 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {
+  string key = 1;
+  oneof payload {
+    int32 col_1 = 2;
+    string col_2 = 3;
+    int64 col_3 = 4;
+  }
+  repeated string col_4 = 5;
+}
+
+message EventWithRecursion {
+  int32 key = 1;
+  messageA a = 2;
+}
+message messageA {
+  EventWithRecursion a = 1;
+  messageB b = 2;
+}
+message messageB {
+  EventWithRecursion aa = 1;
+  messageC c = 2;
+}
+message messageC {
+  EventWithRecursion aaa = 1;
+  int32 key= 2;
+}
+
+message Employee {
+  string firstName = 1;
+  string lastName = 2;
+  oneof role {

Review Comment:
   Do we need so many fields for 'OneOf'? How about just 2 or 3? It will simplify testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053474862


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,27 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {

Review Comment:
   not related to this PR, but why would we lock `ScalaReflectionLock` here?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,27 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {

Review Comment:
   cc @gengliangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053479145


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,17 +108,35 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // If the `recursive.fields.max.depth` value is not specified, it will default to -1;
+        // recursive fields are not permitted. Setting it to 0 drops all recursive fields,
+        // 1 allows it to be recursed once, and 2 allows it to be recursed twice and so on.
+        // A value greater than 10 is not allowed, and if a protobuf record has more depth for
+        // recursive fields than the allowed value, it will be truncated and some fields may be
+        // discarded.
+        // SQL Schema for the protobuf message `message Person { string name = 1; Person bff = 2}`
+        // will vary based on the value of "recursive.fields.max.depth".
+        // 0: struct<name: string, bff: null>
+        // 1: struct<name string, bff: <name: string, bff: null>>
+        // 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
+        val recordName = fd.getMessageType.getFullName

Review Comment:
   ```suggestion
           val recordName = fd.getFullName
   ```
   are they same? The previous code uses `fd.getFullName`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044857157


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi @baganokodo2022 thanks for the quick meet. meeting conclusion was to use descriptor type full name and added unit tests with some complex schema. 
   ```
   val recordName = fd.getMessageType.getFullName
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] baganokodo2022 commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
baganokodo2022 commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043925095


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi , the issue is that protobuf is designed for interactions between micro services. Protobuf schemas are long existing before we introduced Delta lake to the corporate. In many cases, we have to comprise for the needs of micro services, not the other way around. Service developers typically are not motivated or willing to take the risk to revise their data model for data analytics purposes. I will compile a proto schema shortly. cc @SandishKumarHN 
   
   thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand how that is related 'type' vs 'name'. There is only one type of recursion.
   If the redundant data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). Or just drop them in Spark sql. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043982706


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   I see only one type of recursion, that is based on protobuf message type. I.e. 
    message A ends up including itself either directly or indirectly through nesting. This is the 1st example in @SandishKumarHN's message above. The second one does not have recursion. 
   
   Looking for examples of other types.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044065935


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   >  thread would be `A.B.A.aa.D.d.A.aaa.E`
   
   What is this thread? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1430285756

   See #40011 for a follow up tweak for this config. '0' is not supported. Fixes how the limit is applied.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1040138657


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,9 +92,13 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
+        // Stop recursion at the first level when a recursive field is encountered.
+        // TODO: The user should be given the option to set the recursion level to 1, 2, or 3

Review Comment:
   @rangadi @mposdev21 Instead of limiting the recursion to only one level, the user should be able to choose a recursion level of 1, 2, or 3. Going beyond 3 levels of recursion should not be allowed. any thoughts? 
   
   `spark.protobuf.recursion.level`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1040138657


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,9 +92,13 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
+        // Stop recursion at the first level when a recursive field is encountered.
+        // TODO: The user should be given the option to set the recursion level to 1, 2, or 3

Review Comment:
   @rangadi Instead of limiting the recursion to only one level, the user should be able to choose a recursion level of 1, 2, or 3. Going beyond 3 levels of recursion should not be allowed. any thoughts? 
   
   `spark.protobuf.recursion.level`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1041487447


##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {

Review Comment:
   @rangadi I see a lot of use cases for the "payload" Oneof the field and recursive fields in it. So I thought combining Oneof with recursion would be a good test. will separate 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -157,6 +157,8 @@ private[sql] class ProtobufDeserializer(
 
       case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
 
+      case (MESSAGE, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)

Review Comment:
   yes, correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1341675259

   @baganokodo2022 Circler type(specially MESSAGE) occurs frequently in a single message. The user won't be able to distinguish it and fix it (imagine which field should user keep or remove). because each type will have a unique field name that is valid. The user can verify and fix the circular reference in the full field name scenario. 
   
   anyways, I have made the initial change as per your idea. have a look at it.
   
   cc: @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1360651458

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044083377


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   :) yeah, field names should not matter at all. 
   We can do video chat to clarify all this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043846427


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > in the case of field_name recursive check it is A.B.C no recursion.
   
   That example is clearly recursion. What is 'C' here?
   
   > but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE
   
   Why is this recursion? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053473031


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,14 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the `recursive.fields.max.depth` to 0 drops all recursive fields,
+  // 1 allows it to be recurse once, and 2 allows it to be recursed twice and so on.
+  // A value of `recursive.fields.max.depth` greater than 10 is not permitted. If it is not
+  // specified, the default value is -1; recursive fields are not permitted. If a protobuf
+  // record has more depth than the allowed value for recursive fields, it will be truncated
+  // and some fields may be discarded.
+  val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt

Review Comment:
   The option name may need a bit more discussion. Usually data source options do not have long names, and don't contains dot. See `JSONOptions`. How about `maxRecursiveFieldDepth`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043982706


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   I see only one type of recursion, that is just just based on protobuf message. I.e. 
    message A ends up including itself either directly or indirectly through nesting. This is the 1st example in @SandishKumarHN's message above. The second one does not have recursion. 
   
   Looking for examples of other types.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1051328177


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -1016,7 +1016,7 @@
   },
   "RECURSIVE_PROTOBUF_SCHEMA" : {
     "message" : [
-      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` as 0 or 1 or 2. Going beyond 3 levels of recursion is not allowed."

Review Comment:
   @rangadi agree. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +106,26 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+        // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+        // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+        // specified, it will default to -1, which disables recursive fields.
+        val recordName = fd.getMessageType.getFullName
+        if (existingRecordNames.contains(recordName) &&

Review Comment:
   Scratch the above suggestion. 
   Instead you could add 'else' to what you have and remove 'return'. That is simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +106,26 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+        // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+        // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+        // specified, it will default to -1, which disables recursive fields.
+        val recordName = fd.getMessageType.getFullName
+        if (existingRecordNames.contains(recordName) &&

Review Comment:
   Scratch the above suggestion. 
   Instead you could add 'else' what you have and remove 'return'. That is simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049175487


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +106,26 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting
+        // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed
+        // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not
+        // specified, it will default to -1, which disables recursive fields.
+        val recordName = fd.getMessageType.getFullName
+        if (existingRecordNames.contains(recordName) &&

Review Comment:
   @rangadi thanks for the review, I have made all changes you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1343686313

   @baganokodo2022 instead of implementing a check for circular reference types in this PR, can we discuss this further and write a proposal for it before adding it to the next PR? We can share the proposal with the [dev@spark.org](mailto:dev@spark.org) mailing list for feedback and input.
   
   @rangadi @HeartSaVioR In this PR, we will only implement a check for circular references through the full field name. Let me know if any further changes are needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1345966383

   Can we give an example mapping between recursive proto type and spark data type? There is no native recursion support in Spark's type system, so this is a bit counterintuitive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @baganokodo2022 Could you translate that to actual protobuf to illustrate the problem? I still don't understand. 
   If the redundat data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand. 
   If the redundat data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053476000


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,27 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {
     SchemaType(
-      StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray),
+      StructType(descriptor.getFields.asScala.flatMap(
+        structFieldFor(_,
+          Map(descriptor.getFullName -> 1),
+          protobufOptions: ProtobufOptions)).toArray),
       nullable = true)
   }
 
   def structFieldFor(
       fd: FieldDescriptor,
-      existingRecordNames: Set[String]): Option[StructField] = {
+      existingRecordNames: Map[String, Int],

Review Comment:
   can we add comments to explain what map key and value means here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1053690877


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,17 +108,35 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
+        // If the `recursive.fields.max.depth` value is not specified, it will default to -1;
+        // recursive fields are not permitted. Setting it to 0 drops all recursive fields,
+        // 1 allows it to be recursed once, and 2 allows it to be recursed twice and so on.
+        // A value greater than 10 is not allowed, and if a protobuf record has more depth for
+        // recursive fields than the allowed value, it will be truncated and some fields may be
+        // discarded.
+        // SQL Schema for the protobuf message `message Person { string name = 1; Person bff = 2}`
+        // will vary based on the value of "recursive.fields.max.depth".
+        // 0: struct<name: string, bff: null>
+        // 1: struct<name string, bff: <name: string, bff: null>>
+        // 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
+        val recordName = fd.getMessageType.getFullName

Review Comment:
   @cloud-fan fd.getFullName gives a fully qualified name along with a field name, we needed the fully qualified type name. we made this decision above. 
   
   here is the difference.
   ```
   println(s"${fd.getFullName} : ${fd.getMessageType.getFullName}")
   
   org.apache.spark.sql.protobuf.protos.Employee.ic : org.apache.spark.sql.protobuf.protos.IC
   org.apache.spark.sql.protobuf.protos.IC.icManager : org.apache.spark.sql.protobuf.protos.Employee
   org.apache.spark.sql.protobuf.protos.Employee.ic : org.apache.spark.sql.protobuf.protos.IC
   org.apache.spark.sql.protobuf.protos.IC.icManager : org.apache.spark.sql.protobuf.protos.Employee
   org.apache.spark.sql.protobuf.protos.Employee.em : org.apache.spark.sql.protobuf.protos.EM
   ```
   @rangadi previous code ```fd.getFullName``` fully qualified name along with a field name works to find out recursion. so before we just use to throw errors on any recursion field. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043814379


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.

Review Comment:
   @rangadi The user can specify the maximum allowed recursion depth for a field by setting the circularReferenceDepth property to 0, 1, or 2. Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed thrice. Attempting to set the circularReferenceDepth to a value greater than 2 is not allowed. If the circularReferenceDepth is not specified, it will default to -1, which disables recursive fields.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi we already know about field_name circusive check. using `fd.getFullName` we detect the recursion and throw and error.  another option is to detect recursion through field type. example below.
   
   ```
   message A {
   B b;
   }
   
   message B {
   A c;
   }
   ```
   in the case of field_name recursive check it is ```A.B.C```  no recursion. 
   in the case of field_type recursive check. it is ```MESSAGE.MESSAGE.MESSAGE``` recursion will be found and throw an error or drop the certain recursive depth. 
   but it will also throw an error for the below case with the field_type check. since it will be ```MESSAGE.MESSAGE.MESSAGE.MESSAGE```
   
   ```
   message A {
   B b = 1;
   }
   
   message B {
   D d = 1;
   }
   
   message D {
   E e = 1;
   }
   
   message E {
   int32 key = 1;
   }
   ```
   @baganokodo2022 argument is field_type base check will give users an option to drop recursion more quickly because with complex nested schema recursive field_name can be found at very deep. before hitting this we might see OOM. field_type base check finds the circle reference more quickly. 
   
   @baganokodo2022 please correct me if I'm wrong. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   @rangadi we have two maps with incremental counters, one for field_name base check and one for field_type. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage}
 import org.apache.spark.sql.{Column, QueryTest, Row}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions.{lit, struct}
-import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
+import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated}

Review Comment:
   @rangadi yes, 
   `Handle recursive fields in Protobuf schema, C->D->Array(C)` and
   `Handle recursive fields in Protobuf schema, A->B->A`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044064987


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > we would fail to detect the recursion for above because the thread would be
   
   Why would we fail? Lets say user does `from_protobuf(col, 'message_A')`
   `A aa = 1` at line 5 would be treated as recursion.
   Why are field names relevant at all?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > we would fail to detect the recursion for above because the thread would be
   
   Why would we fail? Lets say user does `from_protobuf(col, 'message_A')`,
   `A aa = 1` at line 5 would be treated as recursion.
   Why are field names relevant at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1044066697


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   Given this discussion, let's write down functionality and examples, before we implement so that we are all on the same page. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org