You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/31 02:10:44 UTC

spark git commit: [SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide

Repository: spark
Updated Branches:
  refs/heads/master f1550aaf1 -> 8141d5592


[SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide

## What changes were proposed in this pull request?

Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF.

## How was this patch tested?

Author: Li Jin <ic...@gmail.com>

Closes #21887 from icexelloss/SPARK-23633-sql-programming-guide.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8141d559
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8141d559
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8141d559

Branch: refs/heads/master
Commit: 8141d55926e95c06cd66bf82098895e1ed419449
Parents: f1550aa
Author: Li Jin <ic...@gmail.com>
Authored: Tue Jul 31 10:10:38 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Tue Jul 31 10:10:38 2018 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md         | 19 +++++++++++++++
 examples/src/main/python/sql/arrow.py | 37 ++++++++++++++++++++++++++++++
 python/pyspark/sql/functions.py       |  5 ++--
 3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cff521c..5f1eee8 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1811,6 +1811,25 @@ The following example shows how to use `groupby().apply()` to subtract the mean
 For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
 [`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
 
+### Grouped Aggregate
+
+Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and
+[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It defines an aggregation from one or more `pandas.Series`
+to a scalar value, where each `pandas.Series` represents a column within the group or window.
+
+Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also,
+only unbounded window is supported with Grouped aggregate Pandas UDFs currently.
+
+The following example shows how to use this type of UDF to compute mean with groupBy and window operations:
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example grouped_agg_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+
+For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
 ## Usage Notes
 
 ### Supported SQL Types

http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/examples/src/main/python/sql/arrow.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index 4c5aefb..6c4510d 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -113,6 +113,43 @@ def grouped_map_pandas_udf_example(spark):
     # $example off:grouped_map_pandas_udf$
 
 
+def grouped_agg_pandas_udf_example(spark):
+    # $example on:grouped_agg_pandas_udf$
+    from pyspark.sql.functions import pandas_udf, PandasUDFType
+    from pyspark.sql import Window
+
+    df = spark.createDataFrame(
+        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+        ("id", "v"))
+
+    @pandas_udf("double", PandasUDFType.GROUPED_AGG)
+    def mean_udf(v):
+        return v.mean()
+
+    df.groupby("id").agg(mean_udf(df['v'])).show()
+    # +---+-----------+
+    # | id|mean_udf(v)|
+    # +---+-----------+
+    # |  1|        1.5|
+    # |  2|        6.0|
+    # +---+-----------+
+
+    w = Window \
+        .partitionBy('id') \
+        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+    df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
+    # +---+----+------+
+    # | id|   v|mean_v|
+    # +---+----+------+
+    # |  1| 1.0|   1.5|
+    # |  1| 2.0|   1.5|
+    # |  2| 3.0|   6.0|
+    # |  2| 5.0|   6.0|
+    # |  2|10.0|   6.0|
+    # +---+----+------+
+    # $example off:grouped_agg_pandas_udf$
+
+
 if __name__ == "__main__":
     spark = SparkSession \
         .builder \

http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 0a88e48..dd7daf9 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2810,8 +2810,9 @@ def pandas_udf(f=None, returnType=None, functionType=None):
        >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
        ... def mean_udf(v):
        ...     return v.mean()
-       >>> w = Window.partitionBy('id') \\
-       ...           .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+       >>> w = Window \\
+       ...     .partitionBy('id') \\
+       ...     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # doctest: +SKIP
        +---+----+------+
        | id|   v|mean_v|


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org