You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/08 05:26:15 UTC
[spark] branch branch-3.5 updated: [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 959d93aa9be [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
959d93aa9be is described below
commit 959d93aa9bebde1d081afd5d2f2ad60aec907c31
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Fri Sep 8 14:25:51 2023 +0900
[SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
### What changes were proposed in this pull request?
This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5.
<img width="468" alt="Screenshot 2023-08-04 at 14 46 13" src="https://github.com/apache/spark/assets/66282705/11f5dc5e-681b-4677-a466-1a23c0b8dd01">
### Why are the changes needed?
To help users write Python UDTFs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
docs test
Closes #42272 from allisonwang-db/spark-44508-udtf-user-guide.
Authored-by: allisonwang-db <al...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit aaf413ce351dd716096333df140f45f7f1bd5dd6)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
examples/src/main/python/sql/udtf.py | 240 ++++++++++++++++++++++
python/docs/source/user_guide/sql/index.rst | 1 +
python/docs/source/user_guide/sql/python_udtf.rst | 233 +++++++++++++++++++++
python/mypy.ini | 6 +
python/pyspark/sql/functions.py | 7 -
5 files changed, 480 insertions(+), 7 deletions(-)
diff --git a/examples/src/main/python/sql/udtf.py b/examples/src/main/python/sql/udtf.py
new file mode 100644
index 00000000000..768eb73566e
--- /dev/null
+++ b/examples/src/main/python/sql/udtf.py
@@ -0,0 +1,240 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple example demonstrating Python UDTFs in Spark
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/udtf.py
+"""
+
+# NOTE that this file is imported in the User Guides in PySpark documentation.
+# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx.
+from pyspark.sql import SparkSession
+from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
+
+# Python UDTFs use Arrow by default.
+require_minimum_pandas_version()
+require_minimum_pyarrow_version()
+
+
+def python_udtf_simple_example(spark: SparkSession) -> None:
+
+ # Define the UDTF class and implement the required `eval` method.
+ class SquareNumbers:
+ def eval(self, start: int, end: int):
+ for num in range(start, end + 1):
+ yield (num, num * num)
+
+ from pyspark.sql.functions import lit, udtf
+
+ # Create a UDTF using the class definition and the `udtf` function.
+ square_num = udtf(SquareNumbers, returnType="num: int, squared: int")
+
+ # Invoke the UDTF in PySpark.
+ square_num(lit(1), lit(3)).show()
+ # +---+-------+
+ # |num|squared|
+ # +---+-------+
+ # | 1| 1|
+ # | 2| 4|
+ # | 3| 9|
+ # +---+-------+
+
+
+def python_udtf_decorator_example(spark: SparkSession) -> None:
+
+ from pyspark.sql.functions import lit, udtf
+
+ # Define a UDTF using the `udtf` decorator directly on the class.
+ @udtf(returnType="num: int, squared: int")
+ class SquareNumbers:
+ def eval(self, start: int, end: int):
+ for num in range(start, end + 1):
+ yield (num, num * num)
+
+ # Invoke the UDTF in PySpark using the SquareNumbers class directly.
+ SquareNumbers(lit(1), lit(3)).show()
+ # +---+-------+
+ # |num|squared|
+ # +---+-------+
+ # | 1| 1|
+ # | 2| 4|
+ # | 3| 9|
+ # +---+-------+
+
+
+def python_udtf_registration(spark: SparkSession) -> None:
+
+ from pyspark.sql.functions import udtf
+
+ @udtf(returnType="word: string")
+ class WordSplitter:
+ def eval(self, text: str):
+ for word in text.split(" "):
+ yield (word.strip(),)
+
+ # Register the UDTF for use in Spark SQL.
+ spark.udtf.register("split_words", WordSplitter)
+
+ # Example: Using the UDTF in SQL.
+ spark.sql("SELECT * FROM split_words('hello world')").show()
+ # +-----+
+ # | word|
+ # +-----+
+ # |hello|
+ # |world|
+ # +-----+
+
+ # Example: Using the UDTF with a lateral join in SQL.
+ # The lateral join allows us to reference the columns and aliases
+ # in the previous FROM clause items as inputs to the UDTF.
+ spark.sql(
+ "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), "
+ "LATERAL split_words(text)"
+ ).show()
+ # +------------+------+
+ # | text| word|
+ # +------------+------+
+ # | Hello World| Hello|
+ # | Hello World| World|
+ # |Apache Spark|Apache|
+ # |Apache Spark| Spark|
+ # +------------+------+
+
+
+def python_udtf_arrow_example(spark: SparkSession) -> None:
+
+ from pyspark.sql.functions import udtf
+
+ @udtf(returnType="c1: int, c2: int", useArrow=True)
+ class PlusOne:
+ def eval(self, x: int):
+ yield x, x + 1
+
+
+def python_udtf_date_expander_example(spark: SparkSession) -> None:
+
+ from datetime import datetime, timedelta
+ from pyspark.sql.functions import lit, udtf
+
+ @udtf(returnType="date: string")
+ class DateExpander:
+ def eval(self, start_date: str, end_date: str):
+ current = datetime.strptime(start_date, '%Y-%m-%d')
+ end = datetime.strptime(end_date, '%Y-%m-%d')
+ while current <= end:
+ yield (current.strftime('%Y-%m-%d'),)
+ current += timedelta(days=1)
+
+ DateExpander(lit("2023-02-25"), lit("2023-03-01")).show()
+ # +----------+
+ # | date|
+ # +----------+
+ # |2023-02-25|
+ # |2023-02-26|
+ # |2023-02-27|
+ # |2023-02-28|
+ # |2023-03-01|
+ # +----------+
+
+
+def python_udtf_terminate_example(spark: SparkSession) -> None:
+
+ from pyspark.sql.functions import udtf
+
+ @udtf(returnType="cnt: int")
+ class CountUDTF:
+ def __init__(self):
+ # Initialize the counter to 0 when an instance of the class is created.
+ self.count = 0
+
+ def eval(self, x: int):
+ # Increment the counter by 1 for each input value received.
+ self.count += 1
+
+ def terminate(self):
+ # Yield the final count when the UDTF is done processing.
+ yield self.count,
+
+ spark.udtf.register("count_udtf", CountUDTF)
+ spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)").show()
+ # +---+---+
+ # | id|cnt|
+ # +---+---+
+ # | 9| 10|
+ # +---+---+
+ spark.sql("SELECT * FROM range(0, 10, 1, 2), LATERAL count_udtf(id)").show()
+ # +---+---+
+ # | id|cnt|
+ # +---+---+
+ # | 4| 5|
+ # | 9| 5|
+ # +---+---+
+
+
+def python_udtf_table_argument(spark: SparkSession) -> None:
+
+ from pyspark.sql.functions import udtf
+ from pyspark.sql.types import Row
+
+ @udtf(returnType="id: int")
+ class FilterUDTF:
+ def eval(self, row: Row):
+ if row["id"] > 5:
+ yield row["id"],
+
+ spark.udtf.register("filter_udtf", FilterUDTF)
+
+ spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
+ # +---+
+ # | id|
+ # +---+
+ # | 6|
+ # | 7|
+ # | 8|
+ # | 9|
+ # +---+
+
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("Python UDTF example") \
+ .getOrCreate()
+
+ print("Running Python UDTF single example")
+ python_udtf_simple_example(spark)
+
+ print("Running Python UDTF decorator example")
+ python_udtf_decorator_example(spark)
+
+ print("Running Python UDTF registration example")
+ python_udtf_registration(spark)
+
+ print("Running Python UDTF arrow example")
+ python_udtf_arrow_example(spark)
+
+ print("Running Python UDTF date expander example")
+ python_udtf_date_expander_example(spark)
+
+ print("Running Python UDTF terminate example")
+ python_udtf_terminate_example(spark)
+
+ print("Running Python UDTF table argument example")
+ python_udtf_table_argument(spark)
+
+ spark.stop()
diff --git a/python/docs/source/user_guide/sql/index.rst b/python/docs/source/user_guide/sql/index.rst
index 4cab99efa35..c0369de6786 100644
--- a/python/docs/source/user_guide/sql/index.rst
+++ b/python/docs/source/user_guide/sql/index.rst
@@ -24,4 +24,5 @@ Spark SQL
:maxdepth: 2
arrow_pandas
+ python_udtf
diff --git a/python/docs/source/user_guide/sql/python_udtf.rst b/python/docs/source/user_guide/sql/python_udtf.rst
new file mode 100644
index 00000000000..0e583915c58
--- /dev/null
+++ b/python/docs/source/user_guide/sql/python_udtf.rst
@@ -0,0 +1,233 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+===========================================
+Python User-defined Table Functions (UDTFs)
+===========================================
+
+Spark 3.5 introduces the Python user-defined table function (UDTF), a new type of user-defined function.
+Unlike scalar functions that return a single result value from each call, each UDTF is invoked in
+the ``FROM`` clause of a query and returns an entire table as output.
+Each UDTF call can accept zero or more arguments.
+These arguments can either be scalar expressions or table arguments that represent entire input tables.
+
+Implementing a Python UDTF
+--------------------------
+
+.. currentmodule:: pyspark.sql.functions
+
+To implement a Python UDTF, you first need to define a class implementing the methods:
+
+.. code-block:: python
+
+ class PythonUDTF:
+
+ def __init__(self) -> None:
+ """
+ Initializes the user-defined table function (UDTF). This is optional.
+
+ This method serves as the default constructor and is called once when the
+ UDTF is instantiated on the executor side.
+
+ Any class fields assigned in this method will be available for subsequent
+ calls to the `eval` and `terminate` methods. This class instance will remain
+ alive until all rows in the current partition have been consumed by the `eval`
+ method.
+
+ Notes
+ -----
+ - This method does not accept any extra arguments. Only the default
+ constructor is supported.
+ - You cannot create or reference the Spark session within the UDTF. Any
+ attempt to do so will result in a serialization error.
+ """
+ ...
+
+ def eval(self, *args: Any) -> Iterator[Any]:
+ """
+ Evaluates the function using the given input arguments.
+
+ This method is required and must be implemented.
+
+ Argument Mapping:
+ - Each provided scalar expression maps to exactly one value in the
+ `*args` list.
+ - Each provided table argument maps to a pyspark.sql.Row object containing
+ the columns in the order they appear in the provided input table,
+ and with the names computed by the query analyzer.
+
+ This method is called on every input row, and can produce zero or more
+ output rows. Each element in the output tuple corresponds to one column
+ specified in the return type of the UDTF.
+
+ Parameters
+ ----------
+ *args : Any
+ Arbitrary positional arguments representing the input to the UDTF.
+
+ Yields
+ ------
+ tuple
+ A tuple representing a single row in the UDTF result table.
+ Yield as many times as needed to produce multiple rows.
+
+ Notes
+ -----
+ - The result of the function must be a tuple representing a single row
+ in the UDTF result table.
+ - UDTFs currently do not accept keyword arguments during the function call.
+
+ Examples
+ --------
+ eval that returns one row and one column for each input.
+
+ >>> def eval(self, x: int):
+ ... yield (x, )
+
+ eval that returns two rows and two columns for each input.
+
+ >>> def eval(self, x: int, y: int):
+ ... yield (x + y, x - y)
+ ... yield (y + x, y - x)
+ """
+ ...
+
+ def terminate(self) -> Iterator[Any]:
+ """
+ Called when the UDTF has processed all input rows.
+
+ This method is optional to implement and is useful for performing any
+ cleanup or finalization operations after the UDTF has finished processing
+ all rows. It can also be used to yield additional rows if needed.
+ Table functions that consume all rows in the entire input partition
+ and then compute and return the entire output table can do so from
+ this method as well (please be mindful of memory usage when doing
+ this).
+
+ Yields
+ ------
+ tuple
+ A tuple representing a single row in the UDTF result table.
+ Yield this if you want to return additional rows during termination.
+
+ Examples
+ --------
+ >>> def terminate(self) -> Iterator[Any]:
+ >>> yield "done", None
+ """
+ ...
+
+
+The return type of the UDTF defines the schema of the table it outputs.
+It must be either a ``StructType``, for example ``StructType().add("c1", StringType())``
+or a DDL string representing a struct type, for example ``c1: string``.
+
+**Example of UDTF Class Implementation**
+
+Here is a simple example of a UDTF class implementation:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 36-40
+ :dedent: 4
+
+
+**Instantiating a UDTF with the ``udtf`` Decorator**
+
+To make use of the UDTF, you'll first need to instantiate it using the ``@udtf`` decorator:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 42-55
+ :dedent: 4
+
+
+**Instantiating a UDTF with the ``udtf`` Function**
+
+An alternative way to create a UDTF is to use the :func:`udtf` function:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 60-77
+ :dedent: 4
+
+For more detailed usage, please see :func:`udtf`.
+
+
+Registering and Using Python UDTFs in SQL
+-----------------------------------------
+
+Python UDTFs can also be registered and used in SQL queries.
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 82-116
+ :dedent: 4
+
+
+Arrow Optimization
+------------------
+Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer
+data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs.
+
+Arrow can improve performance when each input row generates a large result table from the UDTF.
+
+To enable Arrow optimization, set the ``spark.sql.execution.pythonUDTF.arrow.enabled``
+configuration to ``true``. You can also enable it by specifying the ``useArrow`` parameter
+when declaring the UDTF.
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 121-126
+ :dedent: 4
+
+
+For more details, please see `Apache Arrow in PySpark <../arrow_pandas.rst>`_.
+
+
+TABLE input argument
+~~~~~~~~~~~~~~~~~~~~
+Python UDTFs can also take a TABLE as input argument, and it can be used in conjunction
+with scalar input arguments.
+By default, you are allowed to have only one TABLE argument as input, primarily for
+performance reasons. If you need to have more than one TABLE input argument,
+you can enable this by setting the ``spark.sql.tvf.allowMultipleTableArguments.enabled``
+configuration to ``true``.
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 191-210
+ :dedent: 4
+
+
+More Examples
+-------------
+
+A Python UDTF that expands date ranges into individual dates:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 131-152
+ :dedent: 4
+
+
+A Python UDTF with ``__init__`` and ``terminate``:
+
+.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
+ :language: python
+ :lines: 157-186
+ :dedent: 4
diff --git a/python/mypy.ini b/python/mypy.ini
index 4d1fc3ceb66..3443af9a865 100644
--- a/python/mypy.ini
+++ b/python/mypy.ini
@@ -80,6 +80,12 @@ disallow_untyped_defs = False
[mypy-pyspark.worker]
disallow_untyped_defs = False
+; Allow untyped def and disable certain error codes in examples
+
+[mypy-python.sql.udtf]
+disallow_untyped_defs = False
+disable_error_code = attr-defined,arg-type,call-arg,union-attr
+
; Ignore errors in tests
[mypy-pyspark.ml.tests.*]
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 1fe2f7d40a2..06cb3063d1b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -15848,13 +15848,6 @@ def udtf(
Use "yield" to produce one row for the UDTF result relation as many times
as needed. In the context of a lateral join, each such result row will be
associated with the most recent input row consumed from the "eval" method.
- Or, use "return" to produce multiple rows for the UDTF result relation at
- once.
-
- >>> class TestUDTF:
- ... def eval(self, a: int):
- ... return [(a, a + 1), (a, a + 2)]
- >>> test_udtf = udtf(TestUDTF, returnType="x: int, y: int")
User-defined table functions are considered opaque to the optimizer by default.
As a result, operations like filters from WHERE clauses or limits from
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org