You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ueshin (via GitHub)" <gi...@apache.org> on 2023/07/26 01:50:31 UTC

[GitHub] [spark] ueshin opened a new pull request, #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

ueshin opened a new pull request, #42161:
URL: https://github.com/apache/spark/pull/42161

   ### What changes were proposed in this pull request?
   
   Fixes `ArrowStreamPandasUDFSerializer` to accept no-column pandas DataFrame.
   
   ```py
   >>> def _scalar_f(id):
   ...   return pd.DataFrame(index=id)
   ...
   >>> scalar_f = pandas_udf(_scalar_f, returnType=StructType())
   >>> df = spark.range(3).withColumn("f", scalar_f(col("id")))
   >>> df.printSchema()
   root
    |-- id: long (nullable = false)
    |-- f: struct (nullable = true)
   
   >>> df.show()
   +---+---+
   | id|  f|
   +---+---+
   |  0| {}|
   |  1| {}|
   |  2| {}|
   +---+---+
   ```
   
   ### Why are the changes needed?
   
   The above query fails with the following error:
   
   ```py
   >>> df.show()
   org.apache.spark.api.python.PythonException: Traceback (most recent call last):
   ...
   ValueError: not enough values to unpack (expected 2, got 0)
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, Pandas UDF will accept no-column pandas DataFrame.
   
   ### How was this patch tested?
   
   Added related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1275516715


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -932,8 +940,7 @@ def eval(self, *args):
                     StructType().add("col0", IntegerType()).add("col1", StringType()),
                     [Row(a=1, b="x")],
                 ),
-                # TODO(SPARK-44479): Support Python UDTFs with empty schema
-                # (func(), StructType(), [Row()]),
+                (func(), StructType(), [Row()]),

Review Comment:
   https://github.com/apache/spark/pull/42176



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -932,8 +940,7 @@ def eval(self, *args):
                     StructType().add("col0", IntegerType()).add("col1", StringType()),
                     [Row(a=1, b="x")],
                 ),
-                # TODO(SPARK-44479): Support Python UDTFs with empty schema
-                # (func(), StructType(), [Row()]),
+                (func(), StructType(), [Row()]),

Review Comment:
   For 3.5: https://github.com/apache/spark/pull/42176



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42161:
URL: https://github.com/apache/spark/pull/42161#issuecomment-1650850836

   cc @HyukjinKwon @allisonwang-db 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1275448295


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -932,8 +940,7 @@ def eval(self, *args):
                     StructType().add("col0", IntegerType()).add("col1", StringType()),
                     [Row(a=1, b="x")],
                 ),
-                # TODO(SPARK-44479): Support Python UDTFs with empty schema
-                # (func(), StructType(), [Row()]),
+                (func(), StructType(), [Row()]),

Review Comment:
   We should backport this PR to 3.5 except for this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1274361664


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -385,37 +385,28 @@ def _create_struct_array(self, df, arrow_struct_type):
         """
         import pyarrow as pa
 
-        # Input partition and result pandas.DataFrame empty, make empty Arrays with struct
-        if len(df) == 0 and len(df.columns) == 0:
-            arrs_names = [
-                (pa.array([], type=field.type), field.name) for field in arrow_struct_type
-            ]
+        if len(df.columns) == 0:
+            return pa.array([{}] * len(df), arrow_struct_type)

Review Comment:
   For example:
   
   ```py
   >>> pdf = pd.DataFrame(index=[1,2,3])
   >>> pdf
   Empty DataFrame
   Columns: []
   Index: [1, 2, 3]
   >>> len(pdf)
   3
   >>> len(pdf.columns)
   0
   ```
   
   or
   
   ```py
   >>> pd.DataFrame([tuple(), tuple(), tuple()])
   Empty DataFrame
   Columns: []
   Index: [0, 1, 2]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1275448295


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -932,8 +940,7 @@ def eval(self, *args):
                     StructType().add("col0", IntegerType()).add("col1", StringType()),
                     [Row(a=1, b="x")],
                 ),
-                # TODO(SPARK-44479): Support Python UDTFs with empty schema
-                # (func(), StructType(), [Row()]),
+                (func(), StructType(), [Row()]),

Review Comment:
   We should backport this PR except for this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42161:
URL: https://github.com/apache/spark/pull/42161#issuecomment-1652751432

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1275240003


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -487,6 +487,14 @@ def eval(self, x: int):
 
         self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x={1: "1"})])
 
+    def test_udtf_with_empty_output_types(self):
+        @udtf(returnType=StructType())

Review Comment:
   This might be a related error:
   ```
   pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.sql.connect.common.InvalidPlanInput) Does not support convert KIND_NOT_SET to catalyst types.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin closed pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin closed pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame
URL: https://github.com/apache/spark/pull/42161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1274361664


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -385,37 +385,28 @@ def _create_struct_array(self, df, arrow_struct_type):
         """
         import pyarrow as pa
 
-        # Input partition and result pandas.DataFrame empty, make empty Arrays with struct
-        if len(df) == 0 and len(df.columns) == 0:
-            arrs_names = [
-                (pa.array([], type=field.type), field.name) for field in arrow_struct_type
-            ]
+        if len(df.columns) == 0:
+            return pa.array([{}] * len(df), arrow_struct_type)

Review Comment:
   For example:
   
   ```py
   >>> pdf = pd.DataFrame(index=[1,2,3])
   >>> pdf
   Empty DataFrame
   Columns: []
   Index: [1, 2, 3]
   >>> len(pdf)
   3
   >>> len(pdf.columns)
   0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1275448791


##########
python/pyspark/sql/connect/types.py:
##########
@@ -170,14 +170,16 @@ def pyspark_types_to_proto_types(data_type: DataType) -> pb2.DataType:
         ret.year_month_interval.start_field = data_type.startField
         ret.year_month_interval.end_field = data_type.endField
     elif isinstance(data_type, StructType):
+        struct = pb2.DataType.Struct()
         for field in data_type.fields:
             struct_field = pb2.DataType.StructField()
             struct_field.name = field.name
             struct_field.data_type.CopyFrom(pyspark_types_to_proto_types(field.dataType))
             struct_field.nullable = field.nullable
             if field.metadata is not None and len(field.metadata) > 0:
                 struct_field.metadata = json.dumps(field.metadata)
-            ret.struct.fields.append(struct_field)
+            struct.fields.append(struct_field)
+        ret.struct.CopyFrom(struct)

Review Comment:
   We might want to backport this part to 3.4, too.



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -932,8 +940,7 @@ def eval(self, *args):
                     StructType().add("col0", IntegerType()).add("col1", StringType()),
                     [Row(a=1, b="x")],
                 ),
-                # TODO(SPARK-44479): Support Python UDTFs with empty schema
-                # (func(), StructType(), [Row()]),
+                (func(), StructType(), [Row()]),

Review Comment:
   We should backport this PR to 3.5 except for this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1274354197


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -385,37 +385,28 @@ def _create_struct_array(self, df, arrow_struct_type):
         """
         import pyarrow as pa
 
-        # Input partition and result pandas.DataFrame empty, make empty Arrays with struct
-        if len(df) == 0 and len(df.columns) == 0:
-            arrs_names = [
-                (pa.array([], type=field.type), field.name) for field in arrow_struct_type
-            ]
+        if len(df.columns) == 0:
+            return pa.array([{}] * len(df), arrow_struct_type)

Review Comment:
   Under what scenario would it be possible for a DataFrame to have rows but no columns? i.e len(df.columns) == 0 but len(df) > 0?



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -487,6 +487,14 @@ def eval(self, x: int):
 
         self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x={1: "1"})])
 
+    def test_udtf_with_empty_output_types(self):
+        @udtf(returnType=StructType())
+        class TestUDTF:
+            def eval(self):
+                yield tuple()
+
+        self.assertEqual(TestUDTF().collect(), [Row()])

Review Comment:
   Let's use assertDataFrameEqual :)



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -385,37 +385,28 @@ def _create_struct_array(self, df, arrow_struct_type):
         """
         import pyarrow as pa
 
-        # Input partition and result pandas.DataFrame empty, make empty Arrays with struct
-        if len(df) == 0 and len(df.columns) == 0:
-            arrs_names = [
-                (pa.array([], type=field.type), field.name) for field in arrow_struct_type
-            ]
+        if len(df.columns) == 0:
+            return pa.array([{}] * len(df), arrow_struct_type)
         # Assign result columns by schema name if user labeled with strings
         elif self._assign_cols_by_name and any(isinstance(name, str) for name in df.columns):

Review Comment:
   ```suggestion
           if self._assign_cols_by_name and any(isinstance(name, str) for name in df.columns):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #42161: [SPARK-44479][PYTHON] Fix ArrowStreamPandasUDFSerializer to accept no-column pandas DataFrame

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #42161:
URL: https://github.com/apache/spark/pull/42161#discussion_r1275479798


##########
python/pyspark/sql/connect/types.py:
##########
@@ -170,14 +170,16 @@ def pyspark_types_to_proto_types(data_type: DataType) -> pb2.DataType:
         ret.year_month_interval.start_field = data_type.startField
         ret.year_month_interval.end_field = data_type.endField
     elif isinstance(data_type, StructType):
+        struct = pb2.DataType.Struct()
         for field in data_type.fields:
             struct_field = pb2.DataType.StructField()
             struct_field.name = field.name
             struct_field.data_type.CopyFrom(pyspark_types_to_proto_types(field.dataType))
             struct_field.nullable = field.nullable
             if field.metadata is not None and len(field.metadata) > 0:
                 struct_field.metadata = json.dumps(field.metadata)
-            ret.struct.fields.append(struct_field)
+            struct.fields.append(struct_field)
+        ret.struct.CopyFrom(struct)

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org