You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/04/20 08:04:34 UTC

[GitHub] [spark] zhengruifeng commented on a diff in pull request #40725: [SPARK-43082][Connect][PYTHON] Arrow-optimized Python UDFs in Spark Connect

zhengruifeng commented on code in PR #40725:
URL: https://github.com/apache/spark/pull/40725#discussion_r1172220137


##########
python/pyspark/sql/udf.py:
##########
@@ -139,48 +141,53 @@ def _create_py_udf(
         and not isinstance(return_type, ArrayType)
     )
     if is_arrow_enabled and is_output_atomic_type and is_func_with_args:
-        require_minimum_pandas_version()
-        require_minimum_pyarrow_version()
-
-        import pandas as pd
-        from pyspark.sql.pandas.functions import _create_pandas_udf  # type: ignore[attr-defined]
-
-        # "result_func" ensures the result of a Python UDF to be consistent with/without Arrow
-        # optimization.
-        # Otherwise, an Arrow-optimized Python UDF raises "pyarrow.lib.ArrowTypeError: Expected a
-        # string or bytes dtype, got ..." whereas a non-Arrow-optimized Python UDF returns
-        # successfully.
-        result_func = lambda pdf: pdf  # noqa: E731
-        if type(return_type) == StringType:
-            result_func = lambda r: str(r) if r is not None else r  # noqa: E731
-        elif type(return_type) == BinaryType:
-            result_func = lambda r: bytes(r) if r is not None else r  # noqa: E731
-
-        def vectorized_udf(*args: pd.Series) -> pd.Series:
-            if any(map(lambda arg: isinstance(arg, pd.DataFrame), args)):
-                raise NotImplementedError(
-                    "Struct input type are not supported with Arrow optimization "
-                    "enabled in Python UDFs. Disable "
-                    "'spark.sql.execution.pythonUDF.arrow.enabled' to workaround."
-                )
-            return pd.Series(result_func(f(*a)) for a in zip(*args))
-
-        # Regular UDFs can take callable instances too.
-        vectorized_udf.__name__ = f.__name__ if hasattr(f, "__name__") else f.__class__.__name__
-        vectorized_udf.__module__ = (
-            f.__module__ if hasattr(f, "__module__") else f.__class__.__module__
-        )
-        vectorized_udf.__doc__ = f.__doc__
-        pudf = _create_pandas_udf(vectorized_udf, returnType, None)
-        # Keep the attributes as if this is a regular Python UDF.
-        pudf.func = f
-        pudf.returnType = return_type
-        pudf.evalType = regular_udf.evalType
-        return pudf
+        return _create_arrow_py_udf(f, regular_udf)
     else:
         return regular_udf
 
 
+def _create_arrow_py_udf(f, regular_udf):  # type: ignore
+    """Create an Arrow-optimized Python UDF out of a regular Python UDF."""
+    require_minimum_pandas_version()
+    require_minimum_pyarrow_version()
+
+    import pandas as pd
+    from pyspark.sql.pandas.functions import _create_pandas_udf
+
+    return_type = regular_udf.returnType

Review Comment:
   it seems that the `regular_udf` is only used to pass the `returnType` and `evalType` ?



##########
python/pyspark/sql/connect/udf.py:
##########
@@ -47,6 +48,41 @@
     from pyspark.sql.types import StringType
 
 
+def _create_py_udf(
+    f: Callable[..., Any],
+    returnType: "DataTypeOrString",
+    evalType: int,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedFunctionLike":
+    from pyspark.sql.udf import _create_arrow_py_udf
+    from pyspark.sql.connect.session import _active_spark_session
+
+    if _active_spark_session is None:
+        is_arrow_enabled = False
+    else:
+        is_arrow_enabled = (
+            _active_spark_session.conf.get("spark.sql.execution.pythonUDF.arrow.enabled") == "true"
+            if useArrow is None
+            else useArrow
+        )
+
+    regular_udf = _create_udf(f, returnType, evalType)
+    return_type = regular_udf.returnType
+    try:
+        is_func_with_args = len(getfullargspec(f).args) > 0
+    except TypeError:
+        is_func_with_args = False
+    is_output_atomic_type = (
+        not isinstance(return_type, StructType)
+        and not isinstance(return_type, MapType)
+        and not isinstance(return_type, ArrayType)
+    )
+    if is_arrow_enabled and is_output_atomic_type and is_func_with_args:

Review Comment:
   what about adding a warning if `useArrow` or the config is set true but can not enable arrow optimization for some reasons?



##########
python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py:
##########
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.sql.tests.connect.test_parity_udf import UDFParityTests
+from pyspark.sql.tests.test_arrow_python_udf import PythonUDFArrowTestsMixin
+
+
+class ArrowPythonUDFParityTests(UDFParityTests, PythonUDFArrowTestsMixin):
+    @classmethod
+    def setUpClass(cls):
+        super(ArrowPythonUDFParityTests, cls).setUpClass()
+        cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", "true")
+

Review Comment:
   do we need `teardownClass` to avoid polluting the environment?



##########
python/pyspark/sql/udf.py:
##########
@@ -139,48 +141,53 @@ def _create_py_udf(
         and not isinstance(return_type, ArrayType)
     )
     if is_arrow_enabled and is_output_atomic_type and is_func_with_args:
-        require_minimum_pandas_version()
-        require_minimum_pyarrow_version()
-
-        import pandas as pd
-        from pyspark.sql.pandas.functions import _create_pandas_udf  # type: ignore[attr-defined]
-
-        # "result_func" ensures the result of a Python UDF to be consistent with/without Arrow
-        # optimization.
-        # Otherwise, an Arrow-optimized Python UDF raises "pyarrow.lib.ArrowTypeError: Expected a
-        # string or bytes dtype, got ..." whereas a non-Arrow-optimized Python UDF returns
-        # successfully.
-        result_func = lambda pdf: pdf  # noqa: E731
-        if type(return_type) == StringType:
-            result_func = lambda r: str(r) if r is not None else r  # noqa: E731
-        elif type(return_type) == BinaryType:
-            result_func = lambda r: bytes(r) if r is not None else r  # noqa: E731
-
-        def vectorized_udf(*args: pd.Series) -> pd.Series:
-            if any(map(lambda arg: isinstance(arg, pd.DataFrame), args)):
-                raise NotImplementedError(
-                    "Struct input type are not supported with Arrow optimization "
-                    "enabled in Python UDFs. Disable "
-                    "'spark.sql.execution.pythonUDF.arrow.enabled' to workaround."
-                )
-            return pd.Series(result_func(f(*a)) for a in zip(*args))
-
-        # Regular UDFs can take callable instances too.
-        vectorized_udf.__name__ = f.__name__ if hasattr(f, "__name__") else f.__class__.__name__
-        vectorized_udf.__module__ = (
-            f.__module__ if hasattr(f, "__module__") else f.__class__.__module__
-        )
-        vectorized_udf.__doc__ = f.__doc__
-        pudf = _create_pandas_udf(vectorized_udf, returnType, None)
-        # Keep the attributes as if this is a regular Python UDF.
-        pudf.func = f
-        pudf.returnType = return_type
-        pudf.evalType = regular_udf.evalType
-        return pudf
+        return _create_arrow_py_udf(f, regular_udf)

Review Comment:
   is `f` just `regular_udf.func` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org