You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "allisonwang-db (via GitHub)" <gi...@apache.org> on 2023/07/05 22:52:22 UTC

[GitHub] [spark] allisonwang-db opened a new pull request, #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

allisonwang-db opened a new pull request, #41867:
URL: https://github.com/apache/spark/pull/41867

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR supports arrow-optimized Python user-defined table functions (UDTFs). Similar to Python UDFs, arrow optimization for Python UDTFs can be enabled/disabled using:
   
   1. The `spark.sql.execution.pythonUDTF.arrow.enabled` config (default: True)
   2. The `useArrow` parameter of the `udtf` function (default: None)
   
   Note that **arrow optimization is enabled by default** for Python UDTFs. If the user does not have the necessary pandas or pyarrow dependencies, it will fall back to using regular Python UDTF.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     4. If you fix a bug, you can clarify why it is a bug.
   -->
   To make Python UDTFs more performant.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes. This PR allows users to create arrow-optimized Python UDTFs.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   New unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261769156


##########
python/pyspark/errors/error_classes.py:
##########


Review Comment:
   Need to regenerate the error class document?
   
   https://github.com/apache/spark/blob/2aa3ba786f58daa57bf8257fb17b333f25b05025/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala#L51-L55



##########
python/pyspark/errors/error_classes.py:
##########


Review Comment:
   Need to regenerate the error class document?
   
   https://github.com/apache/spark/blob/2aa3ba786f58daa57bf8257fb17b333f25b05025/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala#L51-L55



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261739197


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -369,19 +381,115 @@ def eval(self, a: int):
         ):
             TestUDTF(rand(0) * 100).collect()
 
-    def test_udtf_no_eval(self):
-        @udtf(returnType="a: int, b: int")
+    def test_udtf_with_struct_input_type(self):
+        @udtf(returnType="x: string")
         class TestUDTF:
-            def run(self, a: int):
-                yield a, a + 1
+            def eval(self, person):
+                yield f"{person.name}: {person.age}",
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql(
+                "select * from test_udtf(named_struct('name', 'Alice', 'age', 1))"
+            ).collect(),
+            [Row(x="Alice: 1")],
+        )
 
+    def test_udtf_with_array_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, args):
+                yield str(args),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(array(1, 2, 3))").collect(),
+            [Row(x="[1, 2, 3]")],
+        )
+
+    def test_udtf_with_map_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, m):
+                yield str(m),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(map('key', 'value'))").collect(),
+            [Row(x="{'key': 'value'}")],
+        )
+
+    def test_udtf_with_struct_output_types(self):
+        @udtf(returnType="x: struct<a:int,b:int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {"a": x, "b": x + 1},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=Row(a=1, b=2))])
+
+    def test_udtf_with_array_output_types(self):
+        @udtf(returnType="x: array<int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield [x, x + 1, x + 2],
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=[1, 2, 3])])
+
+    def test_udtf_with_map_output_types(self):
+        @udtf(returnType="x: map<int,string>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {x: str(x)},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x={1: "1"})])
+
+    def test_udtf_with_pandas_input_type(self):
+        import pandas as pd
+
+        @udtf(returnType="corr: double")
+        class TestUDTF:
+            def eval(self, s1: pd.Series, s2: pd.Series):
+                yield s1.corr(s2)
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        # TODO(SPARK-43968): check during compile time instead of runtime
         with self.assertRaisesRegex(
-            PythonException,
-            "Failed to execute the user defined table function because it has not "
-            "implemented the 'eval' method. Please add the 'eval' method and try the "
-            "query again.",
+            PythonException, "AttributeError: 'int' object has no attribute 'corr'"
         ):
-            TestUDTF(lit(1)).collect()
+            self.spark.sql(
+                "select * from values (1, 2), (2, 3) t(a, b), " "lateral test_udtf(a, b)"

Review Comment:
   nit: I guess the formatter combined two lines. We don't need an extra blank between two strings.



##########
python/pyspark/worker.py:
##########
@@ -461,24 +462,53 @@ def assign_cols_by_name(runner_conf):
 # ensure the UDTF is valid. This function also prepares a mapper function for applying
 # the UDTF logic to input rows.
 def read_udtf(pickleSer, infile, eval_type):
+    if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF:
+        runner_conf = {}
+        # Load conf used for arrow evaluation.
+        num_conf = read_int(infile)
+        for i in range(num_conf):
+            k = utf8_deserializer.loads(infile)
+            v = utf8_deserializer.loads(infile)
+            runner_conf[k] = v
+
+        # NOTE: if timezone is set here, that implies respectSessionTimeZone is True
+        timezone = runner_conf.get("spark.sql.session.timeZone", None)
+        safecheck = (
+            runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false").lower()
+            == "true"
+        )
+        ser = ArrowStreamPandasUDTFSerializer(
+            timezone,
+            safecheck,
+            assign_cols_by_name(runner_conf),

Review Comment:
   We shouldn't refer the config as the generated pandas DataFrame's columns have no names.



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -454,6 +474,74 @@ def __repr__(self):
         return "ArrowStreamPandasUDFSerializer"
 
 
+class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
+    """
+    Serializer used by Python worker to evaluate Arrow-optimized Python UDTFs.
+    """
+
+    def __init__(self, timezone, safecheck, assign_cols_by_name):
+        super(ArrowStreamPandasUDTFSerializer, self).__init__(
+            timezone=timezone,
+            safecheck=safecheck,
+            assign_cols_by_name=assign_cols_by_name,
+            # Set to 'False' to avoid converting struct type inputs into a pandas DataFrame.
+            df_for_struct=False,
+            # Defines how struct type inputs are converted. If set to "row", struct type inputs
+            # are converted into Rows. Without this setting, a struct type input would be treated
+            # as a dictionary. For example, for named_struct('name', 'Alice', 'age', 1),
+            # if struct_in_pandas="dict", it becomes {"name": "Alice", "age": 1}
+            # if struct_in_pandas="row", it becomes Row(name="Alice", age=1)
+            struct_in_pandas="row",
+            # When dealing with array type inputs, Arrow converts them into numpy.ndarrays.
+            # To ensure consistency across regular and arrow-optimized UDTFs, we further
+            # convert these numpy.ndarrays into Python lists.
+            ndarray_as_list=True,
+            # Enables explicit casting for mismatched return types of Arrow Python UDTFs.
+            arrow_cast=True,
+        )
+
+    def _create_batch(self, series):
+        """
+        Create an Arrow record batch from the given pandas.Series pandas.DataFrame
+        or list of Series or DataFrame, with optional type.
+
+        Parameters
+        ----------
+        series : pandas.Series or pandas.DataFrame or list
+            A single series or dataframe, list of series or dataframe,
+            or list of (series or dataframe, arrow_type)
+
+        Returns
+        -------
+        pyarrow.RecordBatch
+            Arrow RecordBatch
+        """
+        import pandas as pd
+        import pyarrow as pa
+
+        # Make input conform to [(series1, type1), (series2, type2), ...]
+        if not isinstance(series, (list, tuple)) or (
+            len(series) == 2 and isinstance(series[1], pa.DataType)
+        ):
+            series = [series]
+        series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series)
+
+        arrs = []
+        for s, t in series:
+            if not isinstance(s, pd.DataFrame):
+                raise PySparkValueError(
+                    "Output of an arrow-optimized Python UDTFs expects "
+                    f"a pandas.DataFrame but got: {type(s)}"
+                )
+
+            arrs.append(self._create_struct_array(s, t))
+
+        return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))])

Review Comment:
   Just an optimization idea, not a strong suggestion:
   
   Now that we have a separate serialize, we can assume:
   - `len(series) == 1`
   - `isinstance(series[0][0], pd.DataFrame)`
   
   then we can flatten `series[0]` into a `new_series` and call `super()._create_batch(new_series)`? Also in that case, we don't need to flatten it in Java.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1272578497


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -369,19 +381,115 @@ def eval(self, a: int):
         ):
             TestUDTF(rand(0) * 100).collect()
 
-    def test_udtf_no_eval(self):
-        @udtf(returnType="a: int, b: int")
+    def test_udtf_with_struct_input_type(self):
+        @udtf(returnType="x: string")
         class TestUDTF:
-            def run(self, a: int):
-                yield a, a + 1
+            def eval(self, person):
+                yield f"{person.name}: {person.age}",
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql(
+                "select * from test_udtf(named_struct('name', 'Alice', 'age', 1))"
+            ).collect(),
+            [Row(x="Alice: 1")],
+        )
 
+    def test_udtf_with_array_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, args):
+                yield str(args),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(array(1, 2, 3))").collect(),
+            [Row(x="[1, 2, 3]")],
+        )
+
+    def test_udtf_with_map_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, m):
+                yield str(m),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(map('key', 'value'))").collect(),
+            [Row(x="{'key': 'value'}")],
+        )
+
+    def test_udtf_with_struct_output_types(self):
+        @udtf(returnType="x: struct<a:int,b:int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {"a": x, "b": x + 1},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=Row(a=1, b=2))])
+
+    def test_udtf_with_array_output_types(self):
+        @udtf(returnType="x: array<int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield [x, x + 1, x + 2],
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=[1, 2, 3])])
+
+    def test_udtf_with_map_output_types(self):
+        @udtf(returnType="x: map<int,string>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {x: str(x)},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x={1: "1"})])
+
+    def test_udtf_with_pandas_input_type(self):

Review Comment:
   Submitted the PR #42131.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #41867:
URL: https://github.com/apache/spark/pull/41867#issuecomment-1633567803

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257919869


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(
+    cls: Type,
+    returnType: Union[StructType, str],
+    name: Optional[str] = None,
+    deterministic: bool = True,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedTableFunction":
+    """Create a regular or an Arrow-optimized Python UDTF."""
+    # Determine whether to create Arrow-optimized UDTFs.
+    if useArrow is not None:
+        arrow_enabled = useArrow
+    else:
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession
+        arrow_enabled = (
+            session.conf.get("spark.sql.execution.pythonUDTF.arrow.enabled") == "true"
+            if session is not None
+            else True
+        )
+
+    # Create a regular Python UDTF and check for invalid handler class.
+    regular_udtf = _create_udtf(cls, returnType, name, PythonEvalType.SQL_TABLE_UDF, deterministic)
+
+    if arrow_enabled:
+        try:
+            require_minimum_pandas_version()
+            require_minimum_pyarrow_version()
+        except ImportError as e:
+            warnings.warn(
+                f"Arrow optimization for Python UDTFs cannot be enabled: {str(e)}. "
+                f"Falling back to using regular Python UDTFs.",
+                UserWarning,
+            )
+            return regular_udtf
+        return _create_arrow_udtf(regular_udtf)
+    else:
+        return regular_udtf
+
+
+def _create_arrow_udtf(regular_udtf: "UserDefinedTableFunction") -> "UserDefinedTableFunction":
+    """Create an Arrow-optimized Python UDTF."""
+    import pandas as pd
+
+    cls = regular_udtf.func
+
+    class VectorizedUDTF:
+        def __init__(self) -> None:
+            self.func = cls()
+
+        def eval(self, *args: pd.Series) -> Iterator[list[pd.Series]]:
+            if len(args) == 0:
+                yield pd.DataFrame(self.func.eval())
+            else:
+                # Create tuples from the input pandas Series, each tuple
+                # represents a row across all Series.
+                row_tuples = zip(*args)
+                for row in row_tuples:
+                    yield pd.DataFrame(self.func.eval(*row))
+
+        def terminate(self) -> Iterator[pd.DataFrame]:
+            if hasattr(cls, "terminate"):
+                yield pd.DataFrame(self.func.terminate())
+
+    vectorized_udtf = VectorizedUDTF
+    vectorized_udtf.__name__ = cls.__name__
+    vectorized_udtf.__module__ = cls.__module__
+    vectorized_udtf.__doc__ = cls.__doc__
+    vectorized_udtf.eval.__doc__ = getattr(cls, "eval").__doc__
+    if hasattr(cls, "terminate"):

Review Comment:
   Why is the terminate alone from `cls`? shouldn't it be `VectorizedUDTF.terminate`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1256446496


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala:
##########
@@ -187,14 +108,27 @@ class PythonUDTFRunner(
     new PythonUDFWriterThread(env, worker, inputIterator, partitionIndex, context) {
 
       protected override def writeCommand(dataOut: DataOutputStream): Unit = {
-        dataOut.writeInt(argOffsets.length)
-        argOffsets.foreach { offset =>
-          dataOut.writeInt(offset)
-        }
-        dataOut.writeInt(udtf.func.command.length)
-        dataOut.write(udtf.func.command.toArray)
-        writeUTF(udtf.elementSchema.json, dataOut)
+        PythonUDTFRunner.writeUDTF(dataOut, udtf, argOffsets)
       }
     }
   }
 }
+
+object PythonUDTFRunner {
+
+  def writeUTF(str: String, dataOut: DataOutputStream): Unit = {

Review Comment:
   ```suggestion
     def writeUDTF(str: String, dataOut: DataOutputStream): Unit = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1256930643


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -388,13 +388,7 @@ def _create_batch(self, series):
 
         arrs = []
         for s, t in series:
-            if self._struct_in_pandas == "dict" and t is not None and pa.types.is_struct(t):
-                if not isinstance(s, pd.DataFrame):
-                    raise PySparkValueError(
-                        "A field of type StructType expects a pandas.DataFrame, "
-                        "but got: %s" % str(type(s))
-                    )
-
+            if t is not None and pa.types.is_struct(t) and isinstance(s, pd.DataFrame):

Review Comment:
   Good to know. I've added a comment and a unit test to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261470302


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.UnaryExecNode
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+/**
+ * A physical plan that evaluates a [[PythonUDTF]], one partition of tuples at a time.
+ * This is similar to [[EvalPythonExec]].
+ */
+trait EvalPythonUDTFExec extends UnaryExecNode {
+  def udtf: PythonUDTF
+
+  def requiredChildOutput: Seq[Attribute]
+
+  def resultAttrs: Seq[Attribute]
+
+  override def output: Seq[Attribute] = requiredChildOutput ++ resultAttrs
+
+  override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
+
+  protected def evaluate(
+      argOffsets: Array[Int],
+      iter: Iterator[InternalRow],
+      schema: StructType,
+      context: TaskContext): Iterator[Iterator[InternalRow]]
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val inputRDD = child.execute().map(_.copy())
+
+    inputRDD.mapPartitions { iter =>
+      val context = TaskContext.get()
+      val contextAwareIterator = new ContextAwareIterator(context, iter)
+
+      // The queue used to buffer input rows so we can drain it to
+      // combine input with output from Python.
+      val queue = HybridRowQueue(context.taskMemoryManager(),
+        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
+      context.addTaskCompletionListener[Unit] { ctx =>
+        queue.close()
+      }
+
+      // flatten all the arguments
+      val allInputs = new ArrayBuffer[Expression]
+      val dataTypes = new ArrayBuffer[DataType]
+      val argOffsets = udtf.children.map { e =>
+        if (allInputs.exists(_.semanticEquals(e))) {
+          allInputs.indexWhere(_.semanticEquals(e))
+        } else {
+          allInputs += e
+          dataTypes += e.dataType
+          allInputs.length - 1
+        }
+      }.toArray
+      val projection = MutableProjection.create(allInputs.toSeq, child.output)
+      projection.initialize(context.partitionId())
+      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
+        StructField(s"_$i", dt)
+      }.toArray)
+
+      // Add rows to the queue to join later with the result.
+      // Also keep track of the number rows added to the queue.
+      // This is needed to process extra output rows from the `terminate()` call of the UDTF.
+      var count = 0L
+      val projectedRowIter = contextAwareIterator.map { inputRow =>
+        queue.add(inputRow.asInstanceOf[UnsafeRow])
+        count += 1
+        projection(inputRow)
+      }
+
+      val outputRowIterator = evaluate(argOffsets, projectedRowIter, schema, context)
+
+      val pruneChildForResult: InternalRow => InternalRow =
+        if (child.outputSet == AttributeSet(requiredChildOutput)) {
+          identity
+        } else {
+          UnsafeProjection.create(requiredChildOutput, child.output)

Review Comment:
   It's an optimization for `Generate` to avoid producing unnecessary output. Meanwhile, adding a project would require `Generate` to first output that specific field. Please let me know if I have misunderstood your question.
   https://github.com/apache/spark/blob/d9248e83bbb3af49333608bebe7149b1aaeca738/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L245-L251



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #41867:
URL: https://github.com/apache/spark/pull/41867#issuecomment-1625646853

   cc @HyukjinKwon @ueshin @zhengruifeng @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257920655


##########
python/pyspark/sql/udtf.py:
##########
@@ -83,9 +170,15 @@ def __init__(
         self._returnType_placeholder: Optional[StructType] = None
         self._inputTypes_placeholder = None
         self._judtf_placeholder = None
-        self._name = name or func.__name__
+        self.name = name or func.__name__

Review Comment:
   why do we expose the name here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257915251


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(
+    cls: Type,
+    returnType: Union[StructType, str],
+    name: Optional[str] = None,
+    deterministic: bool = True,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedTableFunction":
+    """Create a regular or an Arrow-optimized Python UDTF."""
+    # Determine whether to create Arrow-optimized UDTFs.
+    if useArrow is not None:
+        arrow_enabled = useArrow
+    else:
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession
+        arrow_enabled = (
+            session.conf.get("spark.sql.execution.pythonUDTF.arrow.enabled") == "true"
+            if session is not None
+            else True
+        )
+
+    # Create a regular Python UDTF and check for invalid handler class.
+    regular_udtf = _create_udtf(cls, returnType, name, PythonEvalType.SQL_TABLE_UDF, deterministic)

Review Comment:
   Ah, okie. It's consistent with pandas UDF. Let's fix them in a separate PR together then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1260613692


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.UnaryExecNode
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+/**
+ * A physical plan that evaluates a [[PythonUDTF]], one partition of tuples at a time.
+ * This is similar to [[EvalPythonExec]].
+ */
+trait EvalPythonUDTFExec extends UnaryExecNode {
+  def udtf: PythonUDTF
+
+  def requiredChildOutput: Seq[Attribute]
+
+  def resultAttrs: Seq[Attribute]
+
+  override def output: Seq[Attribute] = requiredChildOutput ++ resultAttrs
+
+  override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
+
+  protected def evaluate(
+      argOffsets: Array[Int],
+      iter: Iterator[InternalRow],
+      schema: StructType,
+      context: TaskContext): Iterator[Iterator[InternalRow]]
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val inputRDD = child.execute().map(_.copy())
+
+    inputRDD.mapPartitions { iter =>
+      val context = TaskContext.get()
+      val contextAwareIterator = new ContextAwareIterator(context, iter)
+
+      // The queue used to buffer input rows so we can drain it to
+      // combine input with output from Python.
+      val queue = HybridRowQueue(context.taskMemoryManager(),
+        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
+      context.addTaskCompletionListener[Unit] { ctx =>
+        queue.close()
+      }
+
+      // flatten all the arguments
+      val allInputs = new ArrayBuffer[Expression]
+      val dataTypes = new ArrayBuffer[DataType]
+      val argOffsets = udtf.children.map { e =>
+        if (allInputs.exists(_.semanticEquals(e))) {
+          allInputs.indexWhere(_.semanticEquals(e))
+        } else {
+          allInputs += e
+          dataTypes += e.dataType
+          allInputs.length - 1
+        }
+      }.toArray
+      val projection = MutableProjection.create(allInputs.toSeq, child.output)
+      projection.initialize(context.partitionId())
+      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
+        StructField(s"_$i", dt)
+      }.toArray)
+
+      // Add rows to the queue to join later with the result.
+      // Also keep track of the number rows added to the queue.
+      // This is needed to process extra output rows from the `terminate()` call of the UDTF.
+      var count = 0L
+      val projectedRowIter = contextAwareIterator.map { inputRow =>
+        queue.add(inputRow.asInstanceOf[UnsafeRow])
+        count += 1
+        projection(inputRow)
+      }
+
+      val outputRowIterator = evaluate(argOffsets, projectedRowIter, schema, context)
+
+      val pruneChildForResult: InternalRow => InternalRow =
+        if (child.outputSet == AttributeSet(requiredChildOutput)) {
+          identity
+        } else {
+          UnsafeProjection.create(requiredChildOutput, child.output)

Review Comment:
   not related to this PR itself:
   why not introducing a `Project` after `child`, when `child.output` doesn't match `requiredChildOutput`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257910365


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(

Review Comment:
   can we name it sth like `_create_arrow_udtf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1260441202


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(
+    cls: Type,
+    returnType: Union[StructType, str],
+    name: Optional[str] = None,
+    deterministic: bool = True,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedTableFunction":
+    """Create a regular or an Arrow-optimized Python UDTF."""
+    # Determine whether to create Arrow-optimized UDTFs.
+    if useArrow is not None:
+        arrow_enabled = useArrow
+    else:
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession

Review Comment:
   Got it, thanks for catching that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1260613692


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.UnaryExecNode
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+/**
+ * A physical plan that evaluates a [[PythonUDTF]], one partition of tuples at a time.
+ * This is similar to [[EvalPythonExec]].
+ */
+trait EvalPythonUDTFExec extends UnaryExecNode {
+  def udtf: PythonUDTF
+
+  def requiredChildOutput: Seq[Attribute]
+
+  def resultAttrs: Seq[Attribute]
+
+  override def output: Seq[Attribute] = requiredChildOutput ++ resultAttrs
+
+  override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
+
+  protected def evaluate(
+      argOffsets: Array[Int],
+      iter: Iterator[InternalRow],
+      schema: StructType,
+      context: TaskContext): Iterator[Iterator[InternalRow]]
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val inputRDD = child.execute().map(_.copy())
+
+    inputRDD.mapPartitions { iter =>
+      val context = TaskContext.get()
+      val contextAwareIterator = new ContextAwareIterator(context, iter)
+
+      // The queue used to buffer input rows so we can drain it to
+      // combine input with output from Python.
+      val queue = HybridRowQueue(context.taskMemoryManager(),
+        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
+      context.addTaskCompletionListener[Unit] { ctx =>
+        queue.close()
+      }
+
+      // flatten all the arguments
+      val allInputs = new ArrayBuffer[Expression]
+      val dataTypes = new ArrayBuffer[DataType]
+      val argOffsets = udtf.children.map { e =>
+        if (allInputs.exists(_.semanticEquals(e))) {
+          allInputs.indexWhere(_.semanticEquals(e))
+        } else {
+          allInputs += e
+          dataTypes += e.dataType
+          allInputs.length - 1
+        }
+      }.toArray
+      val projection = MutableProjection.create(allInputs.toSeq, child.output)
+      projection.initialize(context.partitionId())
+      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
+        StructField(s"_$i", dt)
+      }.toArray)
+
+      // Add rows to the queue to join later with the result.
+      // Also keep track of the number rows added to the queue.
+      // This is needed to process extra output rows from the `terminate()` call of the UDTF.
+      var count = 0L
+      val projectedRowIter = contextAwareIterator.map { inputRow =>
+        queue.add(inputRow.asInstanceOf[UnsafeRow])
+        count += 1
+        projection(inputRow)
+      }
+
+      val outputRowIterator = evaluate(argOffsets, projectedRowIter, schema, context)
+
+      val pruneChildForResult: InternalRow => InternalRow =
+        if (child.outputSet == AttributeSet(requiredChildOutput)) {
+          identity
+        } else {
+          UnsafeProjection.create(requiredChildOutput, child.output)

Review Comment:
   not related to this PR itself:
   why not introducing a `Project` after `child`, if `child.output` doesn't match `requiredChildOutput`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261865836


##########
python/pyspark/worker.py:
##########
@@ -461,24 +462,53 @@ def assign_cols_by_name(runner_conf):
 # ensure the UDTF is valid. This function also prepares a mapper function for applying
 # the UDTF logic to input rows.
 def read_udtf(pickleSer, infile, eval_type):
+    if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF:
+        runner_conf = {}
+        # Load conf used for arrow evaluation.
+        num_conf = read_int(infile)
+        for i in range(num_conf):
+            k = utf8_deserializer.loads(infile)
+            v = utf8_deserializer.loads(infile)
+            runner_conf[k] = v
+
+        # NOTE: if timezone is set here, that implies respectSessionTimeZone is True
+        timezone = runner_conf.get("spark.sql.session.timeZone", None)
+        safecheck = (
+            runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false").lower()
+            == "true"
+        )
+        ser = ArrowStreamPandasUDTFSerializer(
+            timezone,
+            safecheck,
+            assign_cols_by_name(runner_conf),

Review Comment:
   That's right. I am assuming this config is only useful when the output data frame has specific column names like `pd.DataFrame([(1,2),(2,3)], columns=["a", "b"])`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261873500


##########
python/pyspark/worker.py:
##########
@@ -461,24 +462,53 @@ def assign_cols_by_name(runner_conf):
 # ensure the UDTF is valid. This function also prepares a mapper function for applying
 # the UDTF logic to input rows.
 def read_udtf(pickleSer, infile, eval_type):
+    if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF:
+        runner_conf = {}
+        # Load conf used for arrow evaluation.
+        num_conf = read_int(infile)
+        for i in range(num_conf):
+            k = utf8_deserializer.loads(infile)
+            v = utf8_deserializer.loads(infile)
+            runner_conf[k] = v
+
+        # NOTE: if timezone is set here, that implies respectSessionTimeZone is True
+        timezone = runner_conf.get("spark.sql.session.timeZone", None)
+        safecheck = (
+            runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false").lower()
+            == "true"
+        )
+        ser = ArrowStreamPandasUDTFSerializer(
+            timezone,
+            safecheck,
+            assign_cols_by_name(runner_conf),

Review Comment:
   Yes, Pandas UDF wants to take the column names instead of the column order if it's true (by default).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1258991326


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2918,6 +2918,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val PYTHON_TABLE_UDF_ARROW_ENABLED =
+    buildConf("spark.sql.execution.pythonUDTF.arrow.enabled")
+      .doc("Enable Arrow optimization for Python UDTFs.")
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   @HyukjinKwon Arrow optimization is enabled by default for Python UDTFs. If the user does not have the necessary pyarrow dependency, it will fall back to using regular Python UDTFs. Do you think this is viable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1260413521


##########
python/pyspark/sql/udtf.py:
##########
@@ -83,9 +170,15 @@ def __init__(
         self._returnType_placeholder: Optional[StructType] = None
         self._inputTypes_placeholder = None
         self._judtf_placeholder = None
-        self._name = name or func.__name__
+        self.name = name or func.__name__

Review Comment:
   Good catch. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257917416


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(
+    cls: Type,
+    returnType: Union[StructType, str],
+    name: Optional[str] = None,
+    deterministic: bool = True,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedTableFunction":
+    """Create a regular or an Arrow-optimized Python UDTF."""
+    # Determine whether to create Arrow-optimized UDTFs.
+    if useArrow is not None:
+        arrow_enabled = useArrow
+    else:
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession

Review Comment:
   @xinrong-meng we should move the import to `else` branch in `_create_py_udt` too in order to reduce the breaking change as far as possible. For example, once you have this, you can't define pandas UDF without Spark session (e.g., 1. providing the pandas UDF as a library or 2. defining a pandas udf in plain Python interpreter before starting a Spark session).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257910365


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(

Review Comment:
   can we name it sth like `_create_arrow_udtf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #41867:
URL: https://github.com/apache/spark/pull/41867#issuecomment-1633039322

   @HyukjinKwon The test failure seems unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs
URL: https://github.com/apache/spark/pull/41867


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #41867:
URL: https://github.com/apache/spark/pull/41867#issuecomment-1631652792

   @allisonwang-db mind taking a look at the test failure?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1260421186


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -454,6 +474,74 @@ def __repr__(self):
         return "ArrowStreamPandasUDFSerializer"
 
 
+class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):

Review Comment:
   I created a new serializer, rather than modifying the PandasUDFSerializer, to avoid any risk of inadvertently changing the behavior of pandas UDFs. cc @HyukjinKwon @ueshin @xinrong-meng



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1259014291


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -389,44 +439,23 @@ def _create_batch(self, series):
         arrs = []
         for s, t in series:
             if self._struct_in_pandas == "dict" and t is not None and pa.types.is_struct(t):

Review Comment:
   Sorry what exact changes are your suggesting here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1256223855


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -388,13 +388,7 @@ def _create_batch(self, series):
 
         arrs = []
         for s, t in series:
-            if self._struct_in_pandas == "dict" and t is not None and pa.types.is_struct(t):
-                if not isinstance(s, pd.DataFrame):
-                    raise PySparkValueError(
-                        "A field of type StructType expects a pandas.DataFrame, "
-                        "but got: %s" % str(type(s))
-                    )
-
+            if t is not None and pa.types.is_struct(t) and isinstance(s, pd.DataFrame):

Review Comment:
   The changes cause a breaking change on Pandas UDF.
   
   Pandas UDF should return `pd.DataFrame` when the return type is a struct type.
   If it returns a `pd.Series`, it should fail with the error: `A field of type StructType expects a pandas.DataFrame`, but the changes make it allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1256445828


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonUDTFExec.scala:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+/**
+ * A physical plan that evaluates a [[PythonUDTF]] using Apache Arrow.
+ * This is similar to [[ArrowEvalPythonExec]].
+ *
+ * @param udtf the user-defined Python function.
+ * @param requiredChildOutput the required output of the child plan. It's used for omitting data
+ *                            generation that will be discarded next by a projection.
+ * @param resultAttrs the output schema of the Python UDTF.
+ * @param child the child plan.
+ * @param evalType the Python eval type.
+ */
+case class ArrowEvalPythonUDTFExec(
+    udtf: PythonUDTF,
+    requiredChildOutput: Seq[Attribute],
+    resultAttrs: Seq[Attribute],
+    child: SparkPlan,
+    evalType: Int)
+  extends EvalPythonUDTFExec with PythonSQLMetrics {
+
+  private val batchSize = conf.arrowMaxRecordsPerBatch
+  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+  private val largeVarTypes = conf.arrowUseLargeVarTypes
+  private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
+
+  override protected def evaluate(
+    argOffsets: Array[Int],

Review Comment:
   nit: 4 spaces indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1271166023


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -369,19 +381,115 @@ def eval(self, a: int):
         ):
             TestUDTF(rand(0) * 100).collect()
 
-    def test_udtf_no_eval(self):
-        @udtf(returnType="a: int, b: int")
+    def test_udtf_with_struct_input_type(self):
+        @udtf(returnType="x: string")
         class TestUDTF:
-            def run(self, a: int):
-                yield a, a + 1
+            def eval(self, person):
+                yield f"{person.name}: {person.age}",
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql(
+                "select * from test_udtf(named_struct('name', 'Alice', 'age', 1))"
+            ).collect(),
+            [Row(x="Alice: 1")],
+        )
 
+    def test_udtf_with_array_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, args):
+                yield str(args),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(array(1, 2, 3))").collect(),
+            [Row(x="[1, 2, 3]")],
+        )
+
+    def test_udtf_with_map_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, m):
+                yield str(m),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(map('key', 'value'))").collect(),
+            [Row(x="{'key': 'value'}")],
+        )
+
+    def test_udtf_with_struct_output_types(self):
+        @udtf(returnType="x: struct<a:int,b:int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {"a": x, "b": x + 1},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=Row(a=1, b=2))])
+
+    def test_udtf_with_array_output_types(self):
+        @udtf(returnType="x: array<int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield [x, x + 1, x + 2],
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=[1, 2, 3])])
+
+    def test_udtf_with_map_output_types(self):
+        @udtf(returnType="x: map<int,string>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {x: str(x)},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x={1: "1"})])
+
+    def test_udtf_with_pandas_input_type(self):

Review Comment:
   @allisonwang-db This test needs to be skipped when pandas is not available.
   
   ```
   Traceback (most recent call last):
     File "/.../pyspark/sql/tests/test_udtf.py", line 491, in test_udtf_with_pandas_input_type
       import pandas as pd
   ModuleNotFoundError: No module named 'pandas'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257908166


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -389,44 +439,23 @@ def _create_batch(self, series):
         arrs = []
         for s, t in series:
             if self._struct_in_pandas == "dict" and t is not None and pa.types.is_struct(t):

Review Comment:
   @xinrong-meng can we have separate classes for `_struct_in_pandas` cases with the same parent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257906151


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -360,6 +360,56 @@ def arrow_to_pandas(self, arrow_column):
             )
         return s
 
+    def _create_struct_array(self, df, arrow_struct_type):

Review Comment:
   cc @xinrong-meng and @ueshin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1257911965


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(
+    cls: Type,
+    returnType: Union[StructType, str],
+    name: Optional[str] = None,
+    deterministic: bool = True,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedTableFunction":
+    """Create a regular or an Arrow-optimized Python UDTF."""
+    # Determine whether to create Arrow-optimized UDTFs.
+    if useArrow is not None:
+        arrow_enabled = useArrow
+    else:
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession
+        arrow_enabled = (
+            session.conf.get("spark.sql.execution.pythonUDTF.arrow.enabled") == "true"
+            if session is not None
+            else True
+        )
+
+    # Create a regular Python UDTF and check for invalid handler class.
+    regular_udtf = _create_udtf(cls, returnType, name, PythonEvalType.SQL_TABLE_UDF, deterministic)

Review Comment:
   I think we actually should switch the name `_create_py_udtf` <> `_create_udtf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1256161575


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -388,13 +388,7 @@ def _create_batch(self, series):
 
         arrs = []
         for s, t in series:
-            if self._struct_in_pandas == "dict" and t is not None and pa.types.is_struct(t):
-                if not isinstance(s, pd.DataFrame):
-                    raise PySparkValueError(
-                        "A field of type StructType expects a pandas.DataFrame, "
-                        "but got: %s" % str(type(s))
-                    )
-
+            if t is not None and pa.types.is_struct(t) and isinstance(s, pd.DataFrame):

Review Comment:
   cc @xinrong-meng @ueshin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1259003766


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2918,6 +2918,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val PYTHON_TABLE_UDF_ARROW_ENABLED =
+    buildConf("spark.sql.execution.pythonUDTF.arrow.enabled")
+      .doc("Enable Arrow optimization for Python UDTFs.")
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   I don't think we can do it. We should know if there's Arrow installed in all executors. We could optionally fallback in executors but I think there'd be a lot of corner cases to deal with, and I don't think it's worthwhile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1258988779


##########
python/pyspark/sql/udtf.py:
##########
@@ -39,15 +42,98 @@ def _create_udtf(
     cls: Type,
     returnType: Union[StructType, str],
     name: Optional[str] = None,
+    evalType: int = PythonEvalType.SQL_TABLE_UDF,
     deterministic: bool = True,
 ) -> "UserDefinedTableFunction":
-    """Create a Python UDTF."""
+    """Create a Python UDTF with the given eval type."""
     udtf_obj = UserDefinedTableFunction(
-        cls, returnType=returnType, name=name, deterministic=deterministic
+        cls, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic
     )
+
     return udtf_obj
 
 
+def _create_py_udtf(
+    cls: Type,
+    returnType: Union[StructType, str],
+    name: Optional[str] = None,
+    deterministic: bool = True,
+    useArrow: Optional[bool] = None,
+) -> "UserDefinedTableFunction":
+    """Create a regular or an Arrow-optimized Python UDTF."""
+    # Determine whether to create Arrow-optimized UDTFs.
+    if useArrow is not None:
+        arrow_enabled = useArrow
+    else:
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession
+        arrow_enabled = (
+            session.conf.get("spark.sql.execution.pythonUDTF.arrow.enabled") == "true"
+            if session is not None
+            else True
+        )
+
+    # Create a regular Python UDTF and check for invalid handler class.
+    regular_udtf = _create_udtf(cls, returnType, name, PythonEvalType.SQL_TABLE_UDF, deterministic)
+
+    if arrow_enabled:
+        try:
+            require_minimum_pandas_version()
+            require_minimum_pyarrow_version()
+        except ImportError as e:
+            warnings.warn(
+                f"Arrow optimization for Python UDTFs cannot be enabled: {str(e)}. "
+                f"Falling back to using regular Python UDTFs.",
+                UserWarning,
+            )
+            return regular_udtf
+        return _create_arrow_udtf(regular_udtf)
+    else:
+        return regular_udtf
+
+
+def _create_arrow_udtf(regular_udtf: "UserDefinedTableFunction") -> "UserDefinedTableFunction":
+    """Create an Arrow-optimized Python UDTF."""
+    import pandas as pd
+
+    cls = regular_udtf.func
+
+    class VectorizedUDTF:
+        def __init__(self) -> None:
+            self.func = cls()
+
+        def eval(self, *args: pd.Series) -> Iterator[list[pd.Series]]:
+            if len(args) == 0:
+                yield pd.DataFrame(self.func.eval())
+            else:
+                # Create tuples from the input pandas Series, each tuple
+                # represents a row across all Series.
+                row_tuples = zip(*args)
+                for row in row_tuples:
+                    yield pd.DataFrame(self.func.eval(*row))
+
+        def terminate(self) -> Iterator[pd.DataFrame]:
+            if hasattr(cls, "terminate"):
+                yield pd.DataFrame(self.func.terminate())
+
+    vectorized_udtf = VectorizedUDTF
+    vectorized_udtf.__name__ = cls.__name__
+    vectorized_udtf.__module__ = cls.__module__
+    vectorized_udtf.__doc__ = cls.__doc__
+    vectorized_udtf.eval.__doc__ = getattr(cls, "eval").__doc__
+    if hasattr(cls, "terminate"):

Review Comment:
   `terminate` is optional for UDTFs but `VectorizedUDTF.terminate` is always defined. We only want to add the docstring to `VectorizedUDTF.terminate` if the original UDTF class has the `terminate` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #41867:
URL: https://github.com/apache/spark/pull/41867#issuecomment-1633375898

   @allisonwang-db mind resolving conflicts? then I will merge when the tests pass today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261862688


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -369,19 +381,115 @@ def eval(self, a: int):
         ):
             TestUDTF(rand(0) * 100).collect()
 
-    def test_udtf_no_eval(self):
-        @udtf(returnType="a: int, b: int")
+    def test_udtf_with_struct_input_type(self):
+        @udtf(returnType="x: string")
         class TestUDTF:
-            def run(self, a: int):
-                yield a, a + 1
+            def eval(self, person):
+                yield f"{person.name}: {person.age}",
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql(
+                "select * from test_udtf(named_struct('name', 'Alice', 'age', 1))"
+            ).collect(),
+            [Row(x="Alice: 1")],
+        )
 
+    def test_udtf_with_array_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, args):
+                yield str(args),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(array(1, 2, 3))").collect(),
+            [Row(x="[1, 2, 3]")],
+        )
+
+    def test_udtf_with_map_input_type(self):
+        @udtf(returnType="x: string")
+        class TestUDTF:
+            def eval(self, m):
+                yield str(m),
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        self.assertEqual(
+            self.spark.sql("select * from test_udtf(map('key', 'value'))").collect(),
+            [Row(x="{'key': 'value'}")],
+        )
+
+    def test_udtf_with_struct_output_types(self):
+        @udtf(returnType="x: struct<a:int,b:int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {"a": x, "b": x + 1},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=Row(a=1, b=2))])
+
+    def test_udtf_with_array_output_types(self):
+        @udtf(returnType="x: array<int>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield [x, x + 1, x + 2],
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x=[1, 2, 3])])
+
+    def test_udtf_with_map_output_types(self):
+        @udtf(returnType="x: map<int,string>")
+        class TestUDTF:
+            def eval(self, x: int):
+                yield {x: str(x)},
+
+        self.assertEqual(TestUDTF(lit(1)).collect(), [Row(x={1: "1"})])
+
+    def test_udtf_with_pandas_input_type(self):
+        import pandas as pd
+
+        @udtf(returnType="corr: double")
+        class TestUDTF:
+            def eval(self, s1: pd.Series, s2: pd.Series):
+                yield s1.corr(s2)
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+        # TODO(SPARK-43968): check during compile time instead of runtime
         with self.assertRaisesRegex(
-            PythonException,
-            "Failed to execute the user defined table function because it has not "
-            "implemented the 'eval' method. Please add the 'eval' method and try the "
-            "query again.",
+            PythonException, "AttributeError: 'int' object has no attribute 'corr'"
         ):
-            TestUDTF(lit(1)).collect()
+            self.spark.sql(
+                "select * from values (1, 2), (2, 3) t(a, b), " "lateral test_udtf(a, b)"

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41867: [SPARK-43964][SQL][PYTHON] Support arrow-optimized Python UDTFs

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41867:
URL: https://github.com/apache/spark/pull/41867#discussion_r1261867838


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -454,6 +474,74 @@ def __repr__(self):
         return "ArrowStreamPandasUDFSerializer"
 
 
+class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
+    """
+    Serializer used by Python worker to evaluate Arrow-optimized Python UDTFs.
+    """
+
+    def __init__(self, timezone, safecheck, assign_cols_by_name):
+        super(ArrowStreamPandasUDTFSerializer, self).__init__(
+            timezone=timezone,
+            safecheck=safecheck,
+            assign_cols_by_name=assign_cols_by_name,
+            # Set to 'False' to avoid converting struct type inputs into a pandas DataFrame.
+            df_for_struct=False,
+            # Defines how struct type inputs are converted. If set to "row", struct type inputs
+            # are converted into Rows. Without this setting, a struct type input would be treated
+            # as a dictionary. For example, for named_struct('name', 'Alice', 'age', 1),
+            # if struct_in_pandas="dict", it becomes {"name": "Alice", "age": 1}
+            # if struct_in_pandas="row", it becomes Row(name="Alice", age=1)
+            struct_in_pandas="row",
+            # When dealing with array type inputs, Arrow converts them into numpy.ndarrays.
+            # To ensure consistency across regular and arrow-optimized UDTFs, we further
+            # convert these numpy.ndarrays into Python lists.
+            ndarray_as_list=True,
+            # Enables explicit casting for mismatched return types of Arrow Python UDTFs.
+            arrow_cast=True,
+        )
+
+    def _create_batch(self, series):
+        """
+        Create an Arrow record batch from the given pandas.Series pandas.DataFrame
+        or list of Series or DataFrame, with optional type.
+
+        Parameters
+        ----------
+        series : pandas.Series or pandas.DataFrame or list
+            A single series or dataframe, list of series or dataframe,
+            or list of (series or dataframe, arrow_type)
+
+        Returns
+        -------
+        pyarrow.RecordBatch
+            Arrow RecordBatch
+        """
+        import pandas as pd
+        import pyarrow as pa
+
+        # Make input conform to [(series1, type1), (series2, type2), ...]
+        if not isinstance(series, (list, tuple)) or (
+            len(series) == 2 and isinstance(series[1], pa.DataType)
+        ):
+            series = [series]
+        series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series)
+
+        arrs = []
+        for s, t in series:
+            if not isinstance(s, pd.DataFrame):
+                raise PySparkValueError(
+                    "Output of an arrow-optimized Python UDTFs expects "
+                    f"a pandas.DataFrame but got: {type(s)}"
+                )
+
+            arrs.append(self._create_struct_array(s, t))
+
+        return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))])

Review Comment:
   Great idea! I will make a follow-up PR for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org