You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/04/27 21:37:06 UTC

[GitHub] [spark] rangadi opened a new pull request, #40983: [SPARK-43312] Option to convert Any fields into JSON

rangadi opened a new pull request, #40983:
URL: https://github.com/apache/spark/pull/40983

   ### What changes were proposed in this pull request?
   
   This adds an option to convert Protobuf 'Any' fields to JSON. At runtime such 'Any' fields   
   can contain arbitrary Protobuf message serialized as binary data.                            
                                                                                                
   By default when this option is not enabled, such field behaves like normal Protobuf message  
   with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not 
   interpreted. This might not be convenient in practice.                                       
                                                                                                
   One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT. 
   But this is not feasible since the schema for `from_protobuf()` is needed at query compile   
   time and can not change at run time. As a result this is not feasible.                       
                                                                                                
   Another option is parse the binary and deserialize the Protobuf message into JSON string.    
   This this lot more readable than the binary data. This configuration option enables          
   converting Any fields to JSON. The example blow clarifies further.                            
                                                                                                
    Consider two Protobuf types defined as follows:                                             
   ```
      message ProtoWithAny {                                                                    
         string event_name = 1;                                                                 
         google.protobuf.Any details = 2;                                                       
      }                                                                                         
                                                                                                
      message Person {                                                                          
        string name = 1;                                                                        
        int32 id = 2;                                                                           
     }                                                                                          
   ```                                                                                             
   With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")`    
   would be : `STRUCT<event_name: STRING, details: STRING>`.                                    
   At run time, if `details` field contains `Person` Protobuf message, the returned value looks 
   like this with JSON string for `details`:
                                                       
       ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')       
                                                                                                
   Requirements:                                                                                
    - The definitions for all the possible Protobuf types that are used in Any fields should be 
      available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf    
      is not found, it will result in error for that record.                                    
    - This feature is supported with Java classes as well. But only the Protobuf types defined  
      in the same `proto` file as the primary Java class might be visible.                      
      E.g. if `ProtoWithAny` and `Person` in above example are in different proto files,        
      definition for `Person` may not be found.                                                 
   
   ### Why are the changes needed?
   
   Improves handling of Any fields.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No. Default behavior is not changed
   
   
   ### How was this patch tested?
   - Unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #40983:
URL: https://github.com/apache/spark/pull/40983#issuecomment-1535538870

   Thanks, merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang closed pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON
URL: https://github.com/apache/spark/pull/40983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #40983: [SPARK-43312] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #40983:
URL: https://github.com/apache/spark/pull/40983#issuecomment-1526534162

   cc: @gengliangwang, @SandishKumarHN 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1179717089


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -39,13 +39,67 @@ private[sql] class ProtobufOptions(
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
 
-  // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse once,
-  // and 2 allows it to be recursed twice and so on. A value of `recursive.fields.max.depth`
-  // greater than 10 is not permitted. If it is not  specified, the default value is -1;
-  // A value of 0 or below disallows any recursive fields. If a protobuf
-  // record has more depth than the allowed value for recursive fields, it will be truncated
-  // and corresponding fields are ignored (dropped).
+  /**
+   * Adds support for recursive fields. If this option is is not specified, recursive fields are

Review Comment:
   Though not related to this PR, I expanded the documentation for `recursive.fields.max.depth` with clarifying example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #40983:
URL: https://github.com/apache/spark/pull/40983#issuecomment-1535556642

   Thanks @gengliangwang! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1184276121


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -70,6 +69,15 @@ private[sql] class ProtobufDeserializer(
 
   def deserialize(data: Message): Option[InternalRow] = converter(data)
 
+  // JsonFormatter use to convert Any fields (if the option is enabled)
+  // This keeps original field names and does not include any extra whitespace in JSON.
+  // This is used to convert Any fields to json (if configured in Protobuf options).
+  // If the runtime type for Any field is not found in the registry, it throws an exception.
+  private val jsonPrinter = JsonFormat.printer
+    .omittingInsignificantWhitespace()
+    .preservingProtoFieldNames()

Review Comment:
   @SandishKumarHN please approve this PR if it looks good. Planning to get this merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1180582637


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -63,7 +64,16 @@ private[protobuf] case class ProtobufDataToCatalyst(
   @transient private lazy val fieldsNumbers =
     messageDescriptor.getFields.asScala.map(f => f.getNumber).toSet
 
-  @transient private lazy val deserializer = new ProtobufDeserializer(messageDescriptor, dataType)
+  @transient private lazy val deserializer = {
+    val typeRegistry = descFilePath match {
+      case Some(path) if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(path) // This loads all the messages in the file.
+      case None if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(messageDescriptor) // Loads only connected messages.
+      case _ => TypeRegistry.getEmptyTypeRegistry // Default. Json conversion is not enabled.
+    }
+    new ProtobufDeserializer(messageDescriptor, dataType, typeRegistry = typeRegistry)
+  }

Review Comment:
   Both are similar, but not exactly same. `buildDescriptor()` has better error message meant for compile time error for the user. In addition, it supports matching just the message name rather than full name. We can keep this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1185535496


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -39,15 +39,72 @@ private[sql] class ProtobufOptions(
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
 
-  // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse once,
-  // and 2 allows it to be recursed twice and so on. A value of `recursive.fields.max.depth`
-  // greater than 10 is not permitted. If it is not  specified, the default value is -1;
-  // A value of 0 or below disallows any recursive fields. If a protobuf
-  // record has more depth than the allowed value for recursive fields, it will be truncated
-  // and corresponding fields are ignored (dropped).
+  /**
+   * Adds support for recursive fields. If this option is is not specified, recursive fields are
+   * not permitted. Setting it to 0 drops the recursive fields, 1 allows it to be recursed once,
+   * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not
+   * allowed in order avoid inadvertently creating very large schemas. If a Protobuf message
+   * has depth beyond this limit, the Spark struct returned is truncated after the recursion limit.
+   *
+   * Examples. Consider a Protobuf with a recursive field:
+   *   `message Person { string name = 1; Person friend = 2; }`
+   * The following lists the schema with different values for this setting.
+   *  1:  `struct<name: string>`
+   *  2:  `struct<name string, friend: struct<name: string>>`
+   *  3:  `struct<name string, friend: struct<name string, friend: struct<name: string>>>`
+   * and so on.
+   */
   val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
 
-  // Whether to render fields with zero values when deserializing Protobufs to a Spark struct.
+  /**
+   * This option enables converting Protobuf 'Any' fields to JSON. At runtime, such 'Any' fields
+   * can contain arbitrary Protobuf messages as binary data.
+   *
+   * By default when this option is not enabled, such field behaves like normal Protobuf message
+   * with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not
+   * interpreted. The binary data might not be convenient in practice to work with.
+   *
+   * One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT.
+   * But this is not feasible since the schema for `from_protobuf()` is needed at query compile
+   * time and can not change at runtime. As a result, this option is not feasible.
+   *
+   * Another option is parse the binary and deserialize the Protobuf message into JSON string.
+   * This this lot more readable than the binary data. This configuration option enables
+   * converting Any fields to JSON. The example blow clarifies further.
+   *
+   *  Consider two Protobuf types defined as follows:
+   *    message ProtoWithAny {
+   *       string event_name = 1;
+   *       google.protobuf.Any details = 2;
+   *    }
+   *
+   *    message Person {
+   *      string name = 1;
+   *      int32 id = 2;
+   *   }
+   *
+   * With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")`
+   * would be : `STRUCT<event_name: STRING, details: STRING>`.
+   * At run time, if `details` field contains `Person` Protobuf message, the returned value looks
+   * like this:
+   *   ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
+   *
+   * Requirements:
+   *  - The definitions for all the possible Protobuf types that are used in Any fields should be
+   *    available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf
+   *    is not found, it will result in error for that record.
+   *  - This feature is supported with Java classes as well. But only the Protobuf types defined
+   *    in the same `proto` file as the primary Java class might be visible.
+   *    E.g. if `ProtoWithAny` and `Person` in above example are in different proto files,
+   *    definition for `Person` may not be found.
+   *
+   * This feature should be enabled carefully. JSON conversion and processing are inefficient.
+   * In addition schema safety is also reduced making downstream processing error prone.
+   */
+  val convertAnyFieldsToJson: Boolean =
+    parameters.getOrElse("convert.any.fields.to.json", "false").toBoolean

Review Comment:
   There is a large scaladoc comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1180584516


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -70,6 +69,15 @@ private[sql] class ProtobufDeserializer(
 
   def deserialize(data: Message): Option[InternalRow] = converter(data)
 
+  // JsonFormatter use to convert Any fields (if the option is enabled)
+  // This keeps original field names and does not include any extra whitespace in JSON.
+  // This is used to convert Any fields to json (if configured in Protobuf options).
+  // If the runtime type for Any field is not found in the registry, it throws an exception.
+  private val jsonPrinter = JsonFormat.printer
+    .omittingInsignificantWhitespace()
+    .preservingProtoFieldNames()

Review Comment:
   Yes, we do use field names as they appear in Protobuf definition. 
   The setting here only applies to json deserialization. Without this, "event_name" field in example will appear as "eventName" in the JSON.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "SandishKumarHN (via GitHub)" <gi...@apache.org>.
SandishKumarHN commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1179904542


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -70,6 +69,15 @@ private[sql] class ProtobufDeserializer(
 
   def deserialize(data: Message): Option[InternalRow] = converter(data)
 
+  // JsonFormatter use to convert Any fields (if the option is enabled)
+  // This keeps original field names and does not include any extra whitespace in JSON.
+  // This is used to convert Any fields to json (if configured in Protobuf options).
+  // If the runtime type for Any field is not found in the registry, it throws an exception.
+  private val jsonPrinter = JsonFormat.printer
+    .omittingInsignificantWhitespace()
+    .preservingProtoFieldNames()

Review Comment:
   @rangadi Are the names of Protobuf fields preserved for other types? I don't see any explicit use of this for other types.
   
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -63,7 +64,16 @@ private[protobuf] case class ProtobufDataToCatalyst(
   @transient private lazy val fieldsNumbers =
     messageDescriptor.getFields.asScala.map(f => f.getNumber).toSet
 
-  @transient private lazy val deserializer = new ProtobufDeserializer(messageDescriptor, dataType)
+  @transient private lazy val deserializer = {
+    val typeRegistry = descFilePath match {
+      case Some(path) if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(path) // This loads all the messages in the file.
+      case None if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(messageDescriptor) // Loads only connected messages.
+      case _ => TypeRegistry.getEmptyTypeRegistry // Default. Json conversion is not enabled.
+    }
+    new ProtobufDeserializer(messageDescriptor, dataType, typeRegistry = typeRegistry)
+  }

Review Comment:
   @rangadi Is it possible to replace `buildDescriptor(descriptor, messageName)` with `ProtobufUtils.buildTypeRegistry(path)` ? and use `typeRegistry.find(messageName)` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1185517095


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -39,15 +39,72 @@ private[sql] class ProtobufOptions(
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
 
-  // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse once,
-  // and 2 allows it to be recursed twice and so on. A value of `recursive.fields.max.depth`
-  // greater than 10 is not permitted. If it is not  specified, the default value is -1;
-  // A value of 0 or below disallows any recursive fields. If a protobuf
-  // record has more depth than the allowed value for recursive fields, it will be truncated
-  // and corresponding fields are ignored (dropped).
+  /**
+   * Adds support for recursive fields. If this option is is not specified, recursive fields are
+   * not permitted. Setting it to 0 drops the recursive fields, 1 allows it to be recursed once,
+   * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not
+   * allowed in order avoid inadvertently creating very large schemas. If a Protobuf message
+   * has depth beyond this limit, the Spark struct returned is truncated after the recursion limit.
+   *
+   * Examples. Consider a Protobuf with a recursive field:
+   *   `message Person { string name = 1; Person friend = 2; }`
+   * The following lists the schema with different values for this setting.
+   *  1:  `struct<name: string>`
+   *  2:  `struct<name string, friend: struct<name: string>>`
+   *  3:  `struct<name string, friend: struct<name string, friend: struct<name: string>>>`
+   * and so on.
+   */
   val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
 
-  // Whether to render fields with zero values when deserializing Protobufs to a Spark struct.
+  /**
+   * This option enables converting Protobuf 'Any' fields to JSON. At runtime, such 'Any' fields
+   * can contain arbitrary Protobuf messages as binary data.
+   *
+   * By default when this option is not enabled, such field behaves like normal Protobuf message
+   * with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not
+   * interpreted. The binary data might not be convenient in practice to work with.
+   *
+   * One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT.
+   * But this is not feasible since the schema for `from_protobuf()` is needed at query compile
+   * time and can not change at runtime. As a result, this option is not feasible.
+   *
+   * Another option is parse the binary and deserialize the Protobuf message into JSON string.
+   * This this lot more readable than the binary data. This configuration option enables
+   * converting Any fields to JSON. The example blow clarifies further.
+   *
+   *  Consider two Protobuf types defined as follows:
+   *    message ProtoWithAny {
+   *       string event_name = 1;
+   *       google.protobuf.Any details = 2;
+   *    }
+   *
+   *    message Person {
+   *      string name = 1;
+   *      int32 id = 2;
+   *   }
+   *
+   * With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")`
+   * would be : `STRUCT<event_name: STRING, details: STRING>`.
+   * At run time, if `details` field contains `Person` Protobuf message, the returned value looks
+   * like this:
+   *   ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
+   *
+   * Requirements:
+   *  - The definitions for all the possible Protobuf types that are used in Any fields should be
+   *    available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf
+   *    is not found, it will result in error for that record.
+   *  - This feature is supported with Java classes as well. But only the Protobuf types defined
+   *    in the same `proto` file as the primary Java class might be visible.
+   *    E.g. if `ProtoWithAny` and `Person` in above example are in different proto files,
+   *    definition for `Person` may not be found.
+   *
+   * This feature should be enabled carefully. JSON conversion and processing are inefficient.
+   * In addition schema safety is also reduced making downstream processing error prone.
+   */
+  val convertAnyFieldsToJson: Boolean =
+    parameters.getOrElse("convert.any.fields.to.json", "false").toBoolean

Review Comment:
   This is not a source, btw. We already started using spark config naming style. I would like to keep this convention if I can.
   I can move it to a constant in a follow up PR in order to cut down CI/CD time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1185530970


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -39,15 +39,72 @@ private[sql] class ProtobufOptions(
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
 
-  // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse once,
-  // and 2 allows it to be recursed twice and so on. A value of `recursive.fields.max.depth`
-  // greater than 10 is not permitted. If it is not  specified, the default value is -1;
-  // A value of 0 or below disallows any recursive fields. If a protobuf
-  // record has more depth than the allowed value for recursive fields, it will be truncated
-  // and corresponding fields are ignored (dropped).
+  /**
+   * Adds support for recursive fields. If this option is is not specified, recursive fields are
+   * not permitted. Setting it to 0 drops the recursive fields, 1 allows it to be recursed once,
+   * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not
+   * allowed in order avoid inadvertently creating very large schemas. If a Protobuf message
+   * has depth beyond this limit, the Spark struct returned is truncated after the recursion limit.
+   *
+   * Examples. Consider a Protobuf with a recursive field:
+   *   `message Person { string name = 1; Person friend = 2; }`
+   * The following lists the schema with different values for this setting.
+   *  1:  `struct<name: string>`
+   *  2:  `struct<name string, friend: struct<name: string>>`
+   *  3:  `struct<name string, friend: struct<name string, friend: struct<name: string>>>`
+   * and so on.
+   */
   val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
 
-  // Whether to render fields with zero values when deserializing Protobufs to a Spark struct.
+  /**
+   * This option enables converting Protobuf 'Any' fields to JSON. At runtime, such 'Any' fields
+   * can contain arbitrary Protobuf messages as binary data.
+   *
+   * By default when this option is not enabled, such field behaves like normal Protobuf message
+   * with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not
+   * interpreted. The binary data might not be convenient in practice to work with.
+   *
+   * One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT.
+   * But this is not feasible since the schema for `from_protobuf()` is needed at query compile
+   * time and can not change at runtime. As a result, this option is not feasible.
+   *
+   * Another option is parse the binary and deserialize the Protobuf message into JSON string.
+   * This this lot more readable than the binary data. This configuration option enables
+   * converting Any fields to JSON. The example blow clarifies further.
+   *
+   *  Consider two Protobuf types defined as follows:
+   *    message ProtoWithAny {
+   *       string event_name = 1;
+   *       google.protobuf.Any details = 2;
+   *    }
+   *
+   *    message Person {
+   *      string name = 1;
+   *      int32 id = 2;
+   *   }
+   *
+   * With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")`
+   * would be : `STRUCT<event_name: STRING, details: STRING>`.
+   * At run time, if `details` field contains `Person` Protobuf message, the returned value looks
+   * like this:
+   *   ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
+   *
+   * Requirements:
+   *  - The definitions for all the possible Protobuf types that are used in Any fields should be
+   *    available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf
+   *    is not found, it will result in error for that record.
+   *  - This feature is supported with Java classes as well. But only the Protobuf types defined
+   *    in the same `proto` file as the primary Java class might be visible.
+   *    E.g. if `ProtoWithAny` and `Person` in above example are in different proto files,
+   *    definition for `Person` may not be found.
+   *
+   * This feature should be enabled carefully. JSON conversion and processing are inefficient.
+   * In addition schema safety is also reduced making downstream processing error prone.
+   */
+  val convertAnyFieldsToJson: Boolean =
+    parameters.getOrElse("convert.any.fields.to.json", "false").toBoolean

Review Comment:
   Also, it would be great if we can document this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40983: [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40983:
URL: https://github.com/apache/spark/pull/40983#discussion_r1185510255


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -39,15 +39,72 @@ private[sql] class ProtobufOptions(
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
 
-  // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse once,
-  // and 2 allows it to be recursed twice and so on. A value of `recursive.fields.max.depth`
-  // greater than 10 is not permitted. If it is not  specified, the default value is -1;
-  // A value of 0 or below disallows any recursive fields. If a protobuf
-  // record has more depth than the allowed value for recursive fields, it will be truncated
-  // and corresponding fields are ignored (dropped).
+  /**
+   * Adds support for recursive fields. If this option is is not specified, recursive fields are
+   * not permitted. Setting it to 0 drops the recursive fields, 1 allows it to be recursed once,
+   * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not
+   * allowed in order avoid inadvertently creating very large schemas. If a Protobuf message
+   * has depth beyond this limit, the Spark struct returned is truncated after the recursion limit.
+   *
+   * Examples. Consider a Protobuf with a recursive field:
+   *   `message Person { string name = 1; Person friend = 2; }`
+   * The following lists the schema with different values for this setting.
+   *  1:  `struct<name: string>`
+   *  2:  `struct<name string, friend: struct<name: string>>`
+   *  3:  `struct<name string, friend: struct<name string, friend: struct<name: string>>>`
+   * and so on.
+   */
   val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
 
-  // Whether to render fields with zero values when deserializing Protobufs to a Spark struct.
+  /**
+   * This option enables converting Protobuf 'Any' fields to JSON. At runtime, such 'Any' fields
+   * can contain arbitrary Protobuf messages as binary data.
+   *
+   * By default when this option is not enabled, such field behaves like normal Protobuf message
+   * with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not
+   * interpreted. The binary data might not be convenient in practice to work with.
+   *
+   * One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT.
+   * But this is not feasible since the schema for `from_protobuf()` is needed at query compile
+   * time and can not change at runtime. As a result, this option is not feasible.
+   *
+   * Another option is parse the binary and deserialize the Protobuf message into JSON string.
+   * This this lot more readable than the binary data. This configuration option enables
+   * converting Any fields to JSON. The example blow clarifies further.
+   *
+   *  Consider two Protobuf types defined as follows:
+   *    message ProtoWithAny {
+   *       string event_name = 1;
+   *       google.protobuf.Any details = 2;
+   *    }
+   *
+   *    message Person {
+   *      string name = 1;
+   *      int32 id = 2;
+   *   }
+   *
+   * With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")`
+   * would be : `STRUCT<event_name: STRING, details: STRING>`.
+   * At run time, if `details` field contains `Person` Protobuf message, the returned value looks
+   * like this:
+   *   ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
+   *
+   * Requirements:
+   *  - The definitions for all the possible Protobuf types that are used in Any fields should be
+   *    available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf
+   *    is not found, it will result in error for that record.
+   *  - This feature is supported with Java classes as well. But only the Protobuf types defined
+   *    in the same `proto` file as the primary Java class might be visible.
+   *    E.g. if `ProtoWithAny` and `Person` in above example are in different proto files,
+   *    definition for `Person` may not be found.
+   *
+   * This feature should be enabled carefully. JSON conversion and processing are inefficient.
+   * In addition schema safety is also reduced making downstream processing error prone.
+   */
+  val convertAnyFieldsToJson: Boolean =
+    parameters.getOrElse("convert.any.fields.to.json", "false").toBoolean

Review Comment:
   @rangadi let's use the option as `convertAnyFieldToJson`. Also, let's put it in a constant variable instead of hard-coding it across the code.
   Thus naming style is consistent with other data sources like csv/json:
   https://spark.apache.org/docs/latest/sql-data-sources-csv.html
   https://spark.apache.org/docs/latest/sql-data-sources-json.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org