You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "itholic (via GitHub)" <gi...@apache.org> on 2023/09/16 08:23:32 UTC

[GitHub] [spark] itholic commented on a diff in pull request #42938: [SPARK-44788][CONNECT][PYTHON][SQL] Add from_xml and schema_of_xml to pyspark, spark connect and sql function

itholic commented on code in PR #42938:
URL: https://github.com/apache/spark/pull/42938#discussion_r1327920967


##########
python/pyspark/errors/error_classes.py:
##########
@@ -477,6 +477,11 @@
       "Argument `<arg_name>` should be a Column or str, got <arg_type>."
     ]
   },
+  "NOT_COLUMN_OR_STR_OR_STRUCT" : {
+    "message" : [
+      "Argument `<arg_name>` should be a StructType or Column or str, got <arg_type>."

Review Comment:
   nit: 
   ```diff
   - "Argument `<arg_name>` should be a StructType or Column or str, got <arg_type>."
   + "Argument `<arg_name>` should be a StructType, Column or str, got <arg_type>."
   ```
   for consistency to other error messages.



##########
python/pyspark/sql/functions.py:
##########
@@ -13041,6 +13041,117 @@ def json_object_keys(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("json_object_keys", col)
 
 
+@_try_remote_functions
+def from_xml(
+    col: "ColumnOrName",
+    schema: Union[StructType, Column, str],
+    options: Optional[Dict[str, str]] = None,
+) -> Column:
+    """
+    Parses a column containing a XML string to a row with
+    the specified schema. Returns `null`, in the case of an unparseable string.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        a column or column name in XML format
+    schema : :class:`StructType` or str
+        a StructType or Python string literal with a DDL-formatted string
+        to use when parsing the Xml column
+    options : dict, optional
+        options to control parsing. accepts the same options as the Xml datasource.

Review Comment:
   Xml -> XML



##########
python/pyspark/sql/functions.py:
##########
@@ -13041,6 +13041,120 @@ def json_object_keys(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("json_object_keys", col)
 
 
+@_try_remote_functions
+def from_xml(
+    col: "ColumnOrName",
+    schema: Union[StructType, Column, str],
+    options: Optional[Dict[str, str]] = None,
+) -> Column:
+    """
+    Parses a column containing a XML string to a row with
+    the specified schema. Returns `null`, in the case of an unparseable string.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        a column or column name in XML format
+    schema : :class:`StructType` or str
+        a StructType or Python string literal with a DDL-formatted string
+        to use when parsing the Xml column
+    options : dict, optional
+        options to control parsing. accepts the same options as the Xml datasource.
+        See `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_
+        for the version you use.
+
+        .. # noqa
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        a new column of complex type from given XML object.
+
+    Examples

Review Comment:
   Yeah, we can do it in separate PR, but let's simply put some newline to readability for now.
   
   e.g.
   
   ```python
       >>> from pyspark.sql.types import *
       >>> from pyspark.sql.functions import from_xml, schema_of_xml, lit
   
       StructType input with simple IntegerType.
   
       >>> data = [(1, '''<p><a>1</a></p>''')]
       >>> df = spark.createDataFrame(data, ("key", "value"))
       >>> schema = StructType([StructField("a", IntegerType())])
       >>> df.select(from_xml(df.value, schema).alias("xml")).collect()
       [Row(xml=Row(a=1))]
   
       String input.
   
       >>> df.select(from_xml(df.value, "a INT").alias("xml")).collect()
       [Row(xml=Row(a=1))]
   
       StructType input with complex ArrayType.
   
       >>> data = [(1, '<p><a>1</a><a>2</a></p>')]
       >>> df = spark.createDataFrame(data, ("key", "value"))
       >>> schema = StructType([StructField("a", ArrayType(IntegerType()))])
       >>> df.select(from_xml(df.value, schema).alias("xml")).collect()
       [Row(xml=Row(a=[1, 2]))]
   
       Column input generated by schema_of_xml.
   
       >>> schema = schema_of_xml(lit(data[0][1]))
       >>> df.select(from_xml(df.value, schema).alias("xml")).collect()
       [Row(xml=Row(a=[1, 2]))]
   ```



##########
python/pyspark/sql/tests/connect/test_connect_function.py:
##########
@@ -1821,6 +1821,106 @@ def test_json_functions(self):
             sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": "FAILFAST"})),
         )
 
+    def test_xml_functions(self):
+        query = """
+            SELECT * FROM VALUES
+            ('<p><a>1</a></p>', '<p><a>1</a><a>2</a><a>3</a></p>', '<p><a attr="s"><b>5.0</b></a></p>'),
+            ('<p><a>0</a></p>', '<p><a>4</a><a>5</a><a>6</a></p>', '<p><a attr="t"></a></p>')
+            AS tab(a, b, c)
+            """
+        # +---------------+-------------------------------+---------------------------------+
+        # |              a|                              b|                                c|
+        # +---------------+-------------------------------+---------------------------------+
+        # |<p><a>1</a></p>|<p><a>1</a><a>2</a><a>3</a></p>|<p><a attr="s"><b>5.0</b></a></p>|
+        # |<p><a>1</a></p>|<p><a>4</a><a>5</a><a>6</a></p>|          <p><a attr="t"></a></p>|
+        # +---------------+-------------------------------+---------------------------------+
+
+        cdf = self.connect.sql(query)
+        sdf = self.spark.sql(query)
+
+        # test from_xml
+        for schema in [
+            "a INT",
+#            StructType([StructField("a", IntegerType())]),
+#            StructType([StructField("a", ArrayType(IntegerType()))]),
+        ]:
+            self.compare_by_show(
+                cdf.select(CF.from_xml(cdf.a, schema)),
+                sdf.select(SF.from_xml(sdf.a, schema)),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml("a", schema)),
+                sdf.select(SF.from_xml("a", schema)),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml(cdf.a, schema, {"mode": "FAILFAST"})),
+                sdf.select(SF.from_xml(sdf.a, schema, {"mode": "FAILFAST"})),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml("a", schema, {"mode": "FAILFAST"})),
+                sdf.select(SF.from_xml("a", schema, {"mode": "FAILFAST"})),
+            )
+
+        for schema in [
+            "STRUCT<a: ARRAY<INT>>",
+#            StructType([StructField("a", ArrayType(IntegerType()))]),

Review Comment:
   ditto. And let's move the `#` for the proper place.
   ```diff
   - #            StructType([StructField("a", ArrayType(IntegerType()))]),
   +              # StructType([StructField("a", ArrayType(IntegerType()))]),
   ```



##########
python/pyspark/sql/tests/test_functions.py:
##########
@@ -1286,6 +1281,27 @@ def test_from_csv(self):
             message_parameters={"arg_name": "schema", "arg_type": "int"},
         )
 
+    def test_schema_of_xml(self):

Review Comment:
   I think we might need to test for non-connect as well??
   
   Seems like we only compare between connect <> non-connect from `python/pyspark/sql/tests/connect/test_connect_function.py` by using `compare_by_show `, but there is no test for data correctness.



##########
python/pyspark/sql/tests/connect/test_connect_function.py:
##########
@@ -1821,6 +1821,106 @@ def test_json_functions(self):
             sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": "FAILFAST"})),
         )
 
+    def test_xml_functions(self):
+        query = """
+            SELECT * FROM VALUES
+            ('<p><a>1</a></p>', '<p><a>1</a><a>2</a><a>3</a></p>', '<p><a attr="s"><b>5.0</b></a></p>'),
+            ('<p><a>0</a></p>', '<p><a>4</a><a>5</a><a>6</a></p>', '<p><a attr="t"></a></p>')
+            AS tab(a, b, c)
+            """
+        # +---------------+-------------------------------+---------------------------------+
+        # |              a|                              b|                                c|
+        # +---------------+-------------------------------+---------------------------------+
+        # |<p><a>1</a></p>|<p><a>1</a><a>2</a><a>3</a></p>|<p><a attr="s"><b>5.0</b></a></p>|
+        # |<p><a>1</a></p>|<p><a>4</a><a>5</a><a>6</a></p>|          <p><a attr="t"></a></p>|
+        # +---------------+-------------------------------+---------------------------------+
+
+        cdf = self.connect.sql(query)
+        sdf = self.spark.sql(query)
+
+        # test from_xml
+        for schema in [
+            "a INT",
+#            StructType([StructField("a", IntegerType())]),

Review Comment:
   Then let's create JIRA and comment it as TODO for now. I think we need separate investigation.
   
   e.g.
   ```python
           # TODO(SPARK-XXXXX): description (Please fill the actual description and JIRA number)
           for schema in [
               "a INT",
               # StructType([StructField("a", IntegerType())]),
               # StructType([StructField("a", ArrayType(IntegerType()))]),
           ]:
   ```



##########
python/pyspark/sql/functions.py:
##########
@@ -13041,6 +13041,117 @@ def json_object_keys(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("json_object_keys", col)
 
 
+@_try_remote_functions
+def from_xml(
+    col: "ColumnOrName",
+    schema: Union[StructType, Column, str],
+    options: Optional[Dict[str, str]] = None,
+) -> Column:
+    """
+    Parses a column containing a XML string to a row with
+    the specified schema. Returns `null`, in the case of an unparseable string.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        a column or column name in XML format
+    schema : :class:`StructType` or str

Review Comment:
   ```diff
   - schema : :class:`StructType` or str
   + schema : :class:`StructType`, :class:`~pyspark.sql.Column` or str
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org