You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/08 05:26:15 UTC

[spark] branch branch-3.5 updated: [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 959d93aa9be [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
959d93aa9be is described below

commit 959d93aa9bebde1d081afd5d2f2ad60aec907c31
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Fri Sep 8 14:25:51 2023 +0900

    [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
    
    ### What changes were proposed in this pull request?
    
    This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5.
    <img width="468" alt="Screenshot 2023-08-04 at 14 46 13" src="https://github.com/apache/spark/assets/66282705/11f5dc5e-681b-4677-a466-1a23c0b8dd01">
    
    ### Why are the changes needed?
    
    To help users write Python UDTFs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    docs test
    
    Closes #42272 from allisonwang-db/spark-44508-udtf-user-guide.
    
    Authored-by: allisonwang-db <al...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit aaf413ce351dd716096333df140f45f7f1bd5dd6)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 examples/src/main/python/sql/udtf.py              | 240 ++++++++++++++++++++++
 python/docs/source/user_guide/sql/index.rst       |   1 +
 python/docs/source/user_guide/sql/python_udtf.rst | 233 +++++++++++++++++++++
 python/mypy.ini                                   |   6 +
 python/pyspark/sql/functions.py                   |   7 -
 5 files changed, 480 insertions(+), 7 deletions(-)

diff --git a/examples/src/main/python/sql/udtf.py b/examples/src/main/python/sql/udtf.py
new file mode 100644
index 00000000000..768eb73566e
--- /dev/null
+++ b/examples/src/main/python/sql/udtf.py
@@ -0,0 +1,240 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple example demonstrating Python UDTFs in Spark
+Run with:
+  ./bin/spark-submit examples/src/main/python/sql/udtf.py
+"""
+
+# NOTE that this file is imported in the User Guides in PySpark documentation.
+# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx.
+from pyspark.sql import SparkSession
+from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
+
+# Python UDTFs use Arrow by default.
+require_minimum_pandas_version()
+require_minimum_pyarrow_version()
+
+
+def python_udtf_simple_example(spark: SparkSession) -> None:
+
+    # Define the UDTF class and implement the required `eval` method.
+    class SquareNumbers:
+        def eval(self, start: int, end: int):
+            for num in range(start, end + 1):
+                yield (num, num * num)
+
+    from pyspark.sql.functions import lit, udtf
+
+    # Create a UDTF using the class definition and the `udtf` function.
+    square_num = udtf(SquareNumbers, returnType="num: int, squared: int")
+
+    # Invoke the UDTF in PySpark.
+    square_num(lit(1), lit(3)).show()
+    # +---+-------+
+    # |num|squared|
+    # +---+-------+
+    # |  1|      1|
+    # |  2|      4|
+    # |  3|      9|
+    # +---+-------+
+
+
+def python_udtf_decorator_example(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import lit, udtf
+
+    # Define a UDTF using the `udtf` decorator directly on the class.
+    @udtf(returnType="num: int, squared: int")
+    class SquareNumbers:
+        def eval(self, start: int, end: int):
+            for num in range(start, end + 1):
+                yield (num, num * num)
+
+    # Invoke the UDTF in PySpark using the SquareNumbers class directly.
+    SquareNumbers(lit(1), lit(3)).show()
+    # +---+-------+
+    # |num|squared|
+    # +---+-------+
+    # |  1|      1|
+    # |  2|      4|
+    # |  3|      9|
+    # +---+-------+
+
+
+def python_udtf_registration(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+
+    @udtf(returnType="word: string")
+    class WordSplitter:
+        def eval(self, text: str):
+            for word in text.split(" "):
+                yield (word.strip(),)
+
+    # Register the UDTF for use in Spark SQL.
+    spark.udtf.register("split_words", WordSplitter)
+
+    # Example: Using the UDTF in SQL.
+    spark.sql("SELECT * FROM split_words('hello world')").show()
+    # +-----+
+    # | word|
+    # +-----+
+    # |hello|
+    # |world|
+    # +-----+
+
+    # Example: Using the UDTF with a lateral join in SQL.
+    # The lateral join allows us to reference the columns and aliases
+    # in the previous FROM clause items as inputs to the UDTF.
+    spark.sql(
+        "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), "
+        "LATERAL split_words(text)"
+    ).show()
+    # +------------+------+
+    # |        text|  word|
+    # +------------+------+
+    # | Hello World| Hello|
+    # | Hello World| World|
+    # |Apache Spark|Apache|
+    # |Apache Spark| Spark|
+    # +------------+------+
+
+
+def python_udtf_arrow_example(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+
+    @udtf(returnType="c1: int, c2: int", useArrow=True)
+    class PlusOne:
+        def eval(self, x: int):
+            yield x, x + 1
+
+
+def python_udtf_date_expander_example(spark: SparkSession) -> None:
+
+    from datetime import datetime, timedelta
+    from pyspark.sql.functions import lit, udtf
+
+    @udtf(returnType="date: string")
+    class DateExpander:
+        def eval(self, start_date: str, end_date: str):
+            current = datetime.strptime(start_date, '%Y-%m-%d')
+            end = datetime.strptime(end_date, '%Y-%m-%d')
+            while current <= end:
+                yield (current.strftime('%Y-%m-%d'),)
+                current += timedelta(days=1)
+
+    DateExpander(lit("2023-02-25"), lit("2023-03-01")).show()
+    # +----------+
+    # |      date|
+    # +----------+
+    # |2023-02-25|
+    # |2023-02-26|
+    # |2023-02-27|
+    # |2023-02-28|
+    # |2023-03-01|
+    # +----------+
+
+
+def python_udtf_terminate_example(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+
+    @udtf(returnType="cnt: int")
+    class CountUDTF:
+        def __init__(self):
+            # Initialize the counter to 0 when an instance of the class is created.
+            self.count = 0
+
+        def eval(self, x: int):
+            # Increment the counter by 1 for each input value received.
+            self.count += 1
+
+        def terminate(self):
+            # Yield the final count when the UDTF is done processing.
+            yield self.count,
+
+    spark.udtf.register("count_udtf", CountUDTF)
+    spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)").show()
+    # +---+---+
+    # | id|cnt|
+    # +---+---+
+    # |  9| 10|
+    # +---+---+
+    spark.sql("SELECT * FROM range(0, 10, 1, 2), LATERAL count_udtf(id)").show()
+    # +---+---+
+    # | id|cnt|
+    # +---+---+
+    # |  4|  5|
+    # |  9|  5|
+    # +---+---+
+
+
+def python_udtf_table_argument(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+    from pyspark.sql.types import Row
+
+    @udtf(returnType="id: int")
+    class FilterUDTF:
+        def eval(self, row: Row):
+            if row["id"] > 5:
+                yield row["id"],
+
+    spark.udtf.register("filter_udtf", FilterUDTF)
+
+    spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
+    # +---+
+    # | id|
+    # +---+
+    # |  6|
+    # |  7|
+    # |  8|
+    # |  9|
+    # +---+
+
+
+if __name__ == "__main__":
+    spark = SparkSession \
+        .builder \
+        .appName("Python UDTF example") \
+        .getOrCreate()
+
+    print("Running Python UDTF single example")
+    python_udtf_simple_example(spark)
+
+    print("Running Python UDTF decorator example")
+    python_udtf_decorator_example(spark)
+
+    print("Running Python UDTF registration example")
+    python_udtf_registration(spark)
+
+    print("Running Python UDTF arrow example")
+    python_udtf_arrow_example(spark)
+
+    print("Running Python UDTF date expander example")
+    python_udtf_date_expander_example(spark)
+
+    print("Running Python UDTF terminate example")
+    python_udtf_terminate_example(spark)
+
+    print("Running Python UDTF table argument example")
+    python_udtf_table_argument(spark)
+
+    spark.stop()
diff --git a/python/docs/source/user_guide/sql/index.rst b/python/docs/source/user_guide/sql/index.rst
index 4cab99efa35..c0369de6786 100644
--- a/python/docs/source/user_guide/sql/index.rst
+++ b/python/docs/source/user_guide/sql/index.rst
@@ -24,4 +24,5 @@ Spark SQL
    :maxdepth: 2
 
    arrow_pandas
+   python_udtf
 
diff --git a/python/docs/source/user_guide/sql/python_udtf.rst b/python/docs/source/user_guide/sql/python_udtf.rst
new file mode 100644
index 00000000000..0e583915c58
--- /dev/null
+++ b/python/docs/source/user_guide/sql/python_udtf.rst
@@ -0,0 +1,233 @@
+..  Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+..    http://www.apache.org/licenses/LICENSE-2.0
+
+..  Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+===========================================
+Python User-defined Table Functions (UDTFs)
+===========================================
+
+Spark 3.5 introduces the Python user-defined table function (UDTF), a new type of user-defined function. 
+Unlike scalar functions that return a single result value from each call, each UDTF is invoked in 
+the ``FROM`` clause of a query and returns an entire table as output.
+Each UDTF call can accept zero or more arguments.
+These arguments can either be scalar expressions or table arguments that represent entire input tables.
+
+Implementing a Python UDTF
+--------------------------
+
+.. currentmodule:: pyspark.sql.functions
+
+To implement a Python UDTF, you first need to define a class implementing the methods:
+
+.. code-block:: python
+
+    class PythonUDTF:
+
+        def __init__(self) -> None:
+            """
+            Initializes the user-defined table function (UDTF). This is optional.
+
+            This method serves as the default constructor and is called once when the
+            UDTF is instantiated on the executor side.
+            
+            Any class fields assigned in this method will be available for subsequent
+            calls to the `eval` and `terminate` methods. This class instance will remain
+            alive until all rows in the current partition have been consumed by the `eval`
+            method.
+
+            Notes
+            -----
+            - This method does not accept any extra arguments. Only the default
+              constructor is supported.
+            - You cannot create or reference the Spark session within the UDTF. Any
+              attempt to do so will result in a serialization error.
+            """
+            ...
+
+        def eval(self, *args: Any) -> Iterator[Any]:
+            """
+            Evaluates the function using the given input arguments.
+
+            This method is required and must be implemented.
+
+            Argument Mapping:
+            - Each provided scalar expression maps to exactly one value in the
+              `*args` list.
+            - Each provided table argument maps to a pyspark.sql.Row object containing
+              the columns in the order they appear in the provided input table,
+              and with the names computed by the query analyzer.
+
+            This method is called on every input row, and can produce zero or more
+            output rows. Each element in the output tuple corresponds to one column
+            specified in the return type of the UDTF.
+
+            Parameters
+            ----------
+            *args : Any
+                Arbitrary positional arguments representing the input to the UDTF.
+
+            Yields
+            ------
+            tuple
+                A tuple representing a single row in the UDTF result table.
+                Yield as many times as needed to produce multiple rows.
+
+            Notes
+            -----
+            - The result of the function must be a tuple representing a single row
+              in the UDTF result table.
+            - UDTFs currently do not accept keyword arguments during the function call.
+
+            Examples
+            --------
+            eval that returns one row and one column for each input.
+
+            >>> def eval(self, x: int):
+            ...     yield (x, )
+
+            eval that returns two rows and two columns for each input.
+
+            >>> def eval(self, x: int, y: int):
+            ...     yield (x + y, x - y)
+            ...     yield (y + x, y - x)
+            """
+            ...
+
+        def terminate(self) -> Iterator[Any]:
+            """
+            Called when the UDTF has processed all input rows.
+
+            This method is optional to implement and is useful for performing any
+            cleanup or finalization operations after the UDTF has finished processing
+            all rows. It can also be used to yield additional rows if needed.
+            Table functions that consume all rows in the entire input partition
+            and then compute and return the entire output table can do so from
+            this method as well (please be mindful of memory usage when doing
+            this).
+
+            Yields
+            ------
+            tuple
+                A tuple representing a single row in the UDTF result table.
+                Yield this if you want to return additional rows during termination.
+
+            Examples
+            --------
+            >>> def terminate(self) -> Iterator[Any]:
+            >>>     yield "done", None
+            """
+            ...
+
+
+The return type of the UDTF defines the schema of the table it outputs. 
+It must be either a ``StructType``, for example ``StructType().add("c1", StringType())``
+or a DDL string representing a struct type, for example ``c1: string``.
+
+**Example of UDTF Class Implementation**
+
+Here is a simple example of a UDTF class implementation:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 36-40
+    :dedent: 4
+
+
+**Instantiating a UDTF with the ``udtf`` Decorator**
+
+To make use of the UDTF, you'll first need to instantiate it using the ``@udtf`` decorator:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 42-55
+    :dedent: 4
+
+
+**Instantiating a UDTF with the ``udtf`` Function**
+
+An alternative way to create a UDTF is to use the :func:`udtf` function:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 60-77
+    :dedent: 4
+
+For more detailed usage, please see :func:`udtf`.
+
+
+Registering and Using Python UDTFs in SQL
+-----------------------------------------
+
+Python UDTFs can also be registered and used in SQL queries.
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 82-116
+    :dedent: 4
+
+
+Arrow Optimization
+------------------
+Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer
+data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs.
+
+Arrow can improve performance when each input row generates a large result table from the UDTF.
+
+To enable Arrow optimization, set the ``spark.sql.execution.pythonUDTF.arrow.enabled``
+configuration to ``true``. You can also enable it by specifying the ``useArrow`` parameter
+when declaring the UDTF.
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 121-126
+    :dedent: 4
+
+
+For more details, please see `Apache Arrow in PySpark <../arrow_pandas.rst>`_.
+
+
+TABLE input argument
+~~~~~~~~~~~~~~~~~~~~
+Python UDTFs can also take a TABLE as input argument, and it can be used in conjunction 
+with scalar input arguments.
+By default, you are allowed to have only one TABLE argument as input, primarily for 
+performance reasons. If you need to have more than one TABLE input argument, 
+you can enable this by setting the ``spark.sql.tvf.allowMultipleTableArguments.enabled``
+configuration to ``true``.
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 191-210
+    :dedent: 4
+
+
+More Examples
+-------------
+
+A Python UDTF that expands date ranges into individual dates:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 131-152
+    :dedent: 4
+
+
+A Python UDTF with ``__init__`` and ``terminate``:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+    :language: python
+    :lines: 157-186
+    :dedent: 4
diff --git a/python/mypy.ini b/python/mypy.ini
index 4d1fc3ceb66..3443af9a865 100644
--- a/python/mypy.ini
+++ b/python/mypy.ini
@@ -80,6 +80,12 @@ disallow_untyped_defs = False
 [mypy-pyspark.worker]
 disallow_untyped_defs = False
 
+; Allow untyped def and disable certain error codes in examples
+
+[mypy-python.sql.udtf]
+disallow_untyped_defs = False
+disable_error_code = attr-defined,arg-type,call-arg,union-attr
+
 ; Ignore errors in tests
 
 [mypy-pyspark.ml.tests.*]
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 1fe2f7d40a2..06cb3063d1b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -15848,13 +15848,6 @@ def udtf(
     Use "yield" to produce one row for the UDTF result relation as many times
     as needed. In the context of a lateral join, each such result row will be
     associated with the most recent input row consumed from the "eval" method.
-    Or, use "return" to produce multiple rows for the UDTF result relation at
-    once.
-
-    >>> class TestUDTF:
-    ...     def eval(self, a: int):
-    ...         return [(a, a + 1), (a, a + 2)]
-    >>> test_udtf = udtf(TestUDTF, returnType="x: int, y: int")
 
     User-defined table functions are considered opaque to the optimizer by default.
     As a result, operations like filters from WHERE clauses or limits from


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org