You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/31 02:10:44 UTC
spark git commit: [SPARK-23633][SQL] Update Pandas UDFs section in
sql-programming-guide
Repository: spark
Updated Branches:
refs/heads/master f1550aaf1 -> 8141d5592
[SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide
## What changes were proposed in this pull request?
Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF.
## How was this patch tested?
Author: Li Jin <ic...@gmail.com>
Closes #21887 from icexelloss/SPARK-23633-sql-programming-guide.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8141d559
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8141d559
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8141d559
Branch: refs/heads/master
Commit: 8141d55926e95c06cd66bf82098895e1ed419449
Parents: f1550aa
Author: Li Jin <ic...@gmail.com>
Authored: Tue Jul 31 10:10:38 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Tue Jul 31 10:10:38 2018 +0800
----------------------------------------------------------------------
docs/sql-programming-guide.md | 19 +++++++++++++++
examples/src/main/python/sql/arrow.py | 37 ++++++++++++++++++++++++++++++
python/pyspark/sql/functions.py | 5 ++--
3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cff521c..5f1eee8 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1811,6 +1811,25 @@ The following example shows how to use `groupby().apply()` to subtract the mean
For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+### Grouped Aggregate
+
+Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and
+[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It defines an aggregation from one or more `pandas.Series`
+to a scalar value, where each `pandas.Series` represents a column within the group or window.
+
+Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also,
+only unbounded window is supported with Grouped aggregate Pandas UDFs currently.
+
+The following example shows how to use this type of UDF to compute mean with groupBy and window operations:
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example grouped_agg_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+
+For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
## Usage Notes
### Supported SQL Types
http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/examples/src/main/python/sql/arrow.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index 4c5aefb..6c4510d 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -113,6 +113,43 @@ def grouped_map_pandas_udf_example(spark):
# $example off:grouped_map_pandas_udf$
+def grouped_agg_pandas_udf_example(spark):
+ # $example on:grouped_agg_pandas_udf$
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+ from pyspark.sql import Window
+
+ df = spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+ ("id", "v"))
+
+ @pandas_udf("double", PandasUDFType.GROUPED_AGG)
+ def mean_udf(v):
+ return v.mean()
+
+ df.groupby("id").agg(mean_udf(df['v'])).show()
+ # +---+-----------+
+ # | id|mean_udf(v)|
+ # +---+-----------+
+ # | 1| 1.5|
+ # | 2| 6.0|
+ # +---+-----------+
+
+ w = Window \
+ .partitionBy('id') \
+ .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+ df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
+ # +---+----+------+
+ # | id| v|mean_v|
+ # +---+----+------+
+ # | 1| 1.0| 1.5|
+ # | 1| 2.0| 1.5|
+ # | 2| 3.0| 6.0|
+ # | 2| 5.0| 6.0|
+ # | 2|10.0| 6.0|
+ # +---+----+------+
+ # $example off:grouped_agg_pandas_udf$
+
+
if __name__ == "__main__":
spark = SparkSession \
.builder \
http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 0a88e48..dd7daf9 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2810,8 +2810,9 @@ def pandas_udf(f=None, returnType=None, functionType=None):
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
- >>> w = Window.partitionBy('id') \\
- ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+ >>> w = Window \\
+ ... .partitionBy('id') \\
+ ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP
+---+----+------+
| id| v|mean_v|
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org