You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/03 15:34:15 UTC

[GitHub] [spark] LucaCanali opened a new pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

LucaCanali opened a new pull request #35391:
URL: https://github.com/apache/spark/pull/35391


   ### What changes were proposed in this pull request?
   This proposes to allow Pandas UDFs (and mapInArrow UDFs) to operate on columns of type Array of Struct, via arrow serialization.
   It appears to me that pyarrow 2.0.0 allows to serialize array of struct (while pyarrow 1.0.0 throws an error for this type of data).
   
   ### Why are the changes needed?
   This extends the usability of pandas_udf.
   
   ### Does this PR introduce _any_ user-facing change?
   Pandas_udf and mapInArrow will be able to operate on data of type Array of Struct.
   
   ### How was this patch tested?
   A test has been added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35391:
URL: https://github.com/apache/spark/pull/35391#issuecomment-1029132999


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35391:
URL: https://github.com/apache/spark/pull/35391#issuecomment-1029437222


   cc @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LucaCanali commented on a change in pull request #35391: [SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion

Posted by GitBox <gi...@apache.org>.
LucaCanali commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r826067534



##########
File path: python/pyspark/sql/pandas/types.py
##########
@@ -86,8 +86,13 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
     elif type(dt) == DayTimeIntervalType:
         arrow_type = pa.duration("us")
     elif type(dt) == ArrayType:
-        if type(dt.elementType) in [StructType, TimestampType]:
+        if type(dt.elementType) == TimestampType:

Review comment:
       Array of StructType now follows the same type of serialization as StructType.

##########
File path: python/pyspark/sql/pandas/types.py
##########
@@ -86,8 +86,13 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
     elif type(dt) == DayTimeIntervalType:
         arrow_type = pa.duration("us")
     elif type(dt) == ArrayType:
-        if type(dt.elementType) in [StructType, TimestampType]:
+        if type(dt.elementType) == TimestampType:

Review comment:
       Array of StructType now follows the same type of serialization as StructType.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r799112289



##########
File path: python/pyspark/sql/tests/test_pandas_udf_scalar.py
##########
@@ -134,6 +134,24 @@ def test_pandas_udf_nested_arrays(self):
         result = df.select(tokenize("vals").alias("hi"))
         self.assertEqual([Row(hi=[["hi", "boo"]]), Row(hi=[["bye", "boo"]])], result.collect())
 
+    def test_pandas_array_struct(self):
+        @pandas_udf("Array<struct<col1:string, col2:long, col3:double>>")
+        def return_cols(cols):
+            return cols

Review comment:
       You can refer to https://github.com/apache/spark/pull/33876 about where to add the tests. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LucaCanali commented on a change in pull request #35391: [SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion

Posted by GitBox <gi...@apache.org>.
LucaCanali commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r800988645



##########
File path: python/pyspark/sql/pandas/types.py
##########
@@ -86,8 +86,13 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
     elif type(dt) == DayTimeIntervalType:
         arrow_type = pa.duration("us")
     elif type(dt) == ArrayType:
-        if type(dt.elementType) in [StructType, TimestampType]:
+        if type(dt.elementType) == TimestampType:

Review comment:
       I guess I went for the easy way with this PR, as I this is what I used for some tests I ran, where I needed to pass an array of struct to a mapInArrow. If that's not the preferred/correct implementation, however, that sounds like a show stopper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35391:
URL: https://github.com/apache/spark/pull/35391#issuecomment-1029573727


   cc @BryanCutler @ueshin FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r799111952



##########
File path: python/pyspark/sql/tests/test_pandas_udf_scalar.py
##########
@@ -134,6 +134,24 @@ def test_pandas_udf_nested_arrays(self):
         result = df.select(tokenize("vals").alias("hi"))
         self.assertEqual([Row(hi=[["hi", "boo"]]), Row(hi=[["bye", "boo"]])], result.collect())
 
+    def test_pandas_array_struct(self):
+        @pandas_udf("Array<struct<col1:string, col2:long, col3:double>>")
+        def return_cols(cols):
+            return cols
+
+        df = self.spark.createDataFrame(
+            [[[("a", 2, 3.0), ("a", 2, 3.0)]], [[("b", 5, 6.0), ("b", 5, 6.0)]]],
+            "array_struct_col Array<struct<col1:string, col2:long, col3:double>>",
+        )
+        result = df.select(return_cols("array_struct_col").alias("output"))
+        self.assertEqual(
+            [
+                Row(output=[Row(col1="a", col2=2, col3=3.0), Row(col1="a", col2=2, col3=3.0)]),
+                Row(output=[Row(col1="b", col2=5, col3=6.0), Row(col1="b", col2=5, col3=6.0)]),
+            ],
+            result.collect(),

Review comment:
       Maybe we will have to also test `toPandas`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35391: [SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r800266142



##########
File path: python/pyspark/sql/tests/test_pandas_udf_scalar.py
##########
@@ -134,6 +134,30 @@ def test_pandas_udf_nested_arrays(self):
         result = df.select(tokenize("vals").alias("hi"))
         self.assertEqual([Row(hi=[["hi", "boo"]]), Row(hi=[["bye", "boo"]])], result.collect())
 
+    def test_pandas_array_struct(self):
+        # SPARK-38098: Support Array of Struct for Pandas UDFs and toPandas
+        import numpy as np
+
+        @pandas_udf("Array<struct<col1:string, col2:long, col3:double>>")
+        def return_cols(cols):
+            self.assertEqual(type(cols), pd.Series)
+            self.assertEqual(type(cols[0]), np.ndarray)
+            self.assertEqual(type(cols[0][0]), dict)

Review comment:
       I wonder if it makes more sense if we can return a NumPy array of DataFrame here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LucaCanali commented on pull request #35391: [SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion

Posted by GitBox <gi...@apache.org>.
LucaCanali commented on pull request #35391:
URL: https://github.com/apache/spark/pull/35391#issuecomment-1076784272


   I must say I am bit puzzled by the error found in test_pandas_array_struct as I cannot reproduce it in my test system.
   When I run `python/run-tests --modules pyspark-sql --testnames pyspark.sql.tests.test_pandas_udf_scalar` locally I can see the test going through OK.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r799111820



##########
File path: python/pyspark/sql/tests/test_pandas_udf_scalar.py
##########
@@ -134,6 +134,24 @@ def test_pandas_udf_nested_arrays(self):
         result = df.select(tokenize("vals").alias("hi"))
         self.assertEqual([Row(hi=[["hi", "boo"]]), Row(hi=[["bye", "boo"]])], result.collect())
 
+    def test_pandas_array_struct(self):
+        @pandas_udf("Array<struct<col1:string, col2:long, col3:double>>")
+        def return_cols(cols):
+            return cols

Review comment:
       Can we add an assert on its data type here too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35391: [SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r800265582



##########
File path: python/pyspark/sql/pandas/types.py
##########
@@ -86,8 +86,13 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
     elif type(dt) == DayTimeIntervalType:
         arrow_type = pa.duration("us")
     elif type(dt) == ArrayType:
-        if type(dt.elementType) in [StructType, TimestampType]:
+        if type(dt.elementType) == TimestampType:

Review comment:
       My only concern here is that we current convert the struct type as a DataFrame instead a Series of dictionaries (e.g., `returnType=struct<...>`) which is inconsistent with the current conversion this PR proposes (a Series of dictionaries).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35391: [SPARK-38098][PYTHON] Support Array of Struct for Pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35391:
URL: https://github.com/apache/spark/pull/35391#discussion_r799112395



##########
File path: python/pyspark/sql/tests/test_pandas_udf_scalar.py
##########
@@ -134,6 +134,24 @@ def test_pandas_udf_nested_arrays(self):
         result = df.select(tokenize("vals").alias("hi"))
         self.assertEqual([Row(hi=[["hi", "boo"]]), Row(hi=[["bye", "boo"]])], result.collect())
 
+    def test_pandas_array_struct(self):

Review comment:
       Let's add a comment with `# SPARK-XXXXX: ...` (see Pull request in https://spark.apache.org/contributing.html)

##########
File path: python/pyspark/sql/tests/test_pandas_udf_scalar.py
##########
@@ -134,6 +134,24 @@ def test_pandas_udf_nested_arrays(self):
         result = df.select(tokenize("vals").alias("hi"))
         self.assertEqual([Row(hi=[["hi", "boo"]]), Row(hi=[["bye", "boo"]])], result.collect())
 
+    def test_pandas_array_struct(self):
+        @pandas_udf("Array<struct<col1:string, col2:long, col3:double>>")
+        def return_cols(cols):
+            return cols
+
+        df = self.spark.createDataFrame(
+            [[[("a", 2, 3.0), ("a", 2, 3.0)]], [[("b", 5, 6.0), ("b", 5, 6.0)]]],
+            "array_struct_col Array<struct<col1:string, col2:long, col3:double>>",
+        )
+        result = df.select(return_cols("array_struct_col").alias("output"))
+        self.assertEqual(
+            [
+                Row(output=[Row(col1="a", col2=2, col3=3.0), Row(col1="a", col2=2, col3=3.0)]),
+                Row(output=[Row(col1="b", col2=5, col3=6.0), Row(col1="b", col2=5, col3=6.0)]),
+            ],
+            result.collect(),

Review comment:
       You can refer to https://github.com/apache/spark/pull/33876 about where to add the tests. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35391: [SPARK-38098][PYTHON] Add support for ArrayType of nested StructType to arrow-based conversion

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35391:
URL: https://github.com/apache/spark/pull/35391#issuecomment-1075922395


   @LucaCanali thanks for addressing my comment. The test seems like failing:
   
   ```
   
   ======================================================================
   ERROR [0.182s]: test_pandas_array_struct (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/__w/spark/spark/python/pyspark/sql/tests/test_pandas_udf_scalar.py", line 152, in test_pandas_array_struct
       result = df.select(return_cols("array_struct_col"))
     File "/__w/spark/spark/python/pyspark/sql/udf.py", line 276, in wrapper
       return self(*args)
     File "/__w/spark/spark/python/pyspark/sql/udf.py", line 249, in __call__
       judf = self._judf
     File "/__w/spark/spark/python/pyspark/sql/udf.py", line 215, in _judf
       self._judf_placeholder = self._create_judf(self.func)
     File "/__w/spark/spark/python/pyspark/sql/udf.py", line 224, in _create_judf
       wrapped_func = _wrap_function(sc, func, self.returnType)
     File "/__w/spark/spark/python/pyspark/sql/udf.py", line 50, in _wrap_function
       pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     File "/__w/spark/spark/python/pyspark/rdd.py", line 3350, in _prepare_for_python_RDD
       pickled_command = ser.dumps(command)
     File "/__w/spark/spark/python/pyspark/serializers.py", line 458, in dumps
       return cloudpickle.dumps(obj, pickle_protocol)
     File "/__w/spark/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
       cp.dump(obj)
     File "/__w/spark/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
       return Pickler.dump(self, obj)
     File "/__w/spark/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 316, in _file_reduce
       raise pickle.PicklingError(
   _pickle.PicklingError: Cannot pickle files that are not opened for reading: w
   ```
   
   Could we fix this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org