You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/09 03:12:46 UTC
[spark] branch master updated: [SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-contained
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 527cce5dfde [SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-contained
527cce5dfde is described below
commit 527cce5dfde68b0c58ba4b94c0288756201e3eff
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Tue Aug 9 12:12:29 2022 +0900
[SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-contained
### What changes were proposed in this pull request?
This PR proposes to improve the examples in `pyspark.sql.group` by making each example self-contained with a brief explanation and a bit more realistic example.
### Why are the changes needed?
To make the documentation more readable and able to copy and paste directly in PySpark shell.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the documentation
### How was this patch tested?
Manually ran each doctest.
Closes #37437 from HyukjinKwon/SPARK-40006.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/group.py | 322 ++++++++++++++++++++++++++++++++++----------
1 file changed, 251 insertions(+), 71 deletions(-)
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index bece13684e0..2fbe76aa5ae 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -25,7 +25,6 @@ from pyspark.sql.column import Column, _to_seq
from pyspark.sql.session import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.pandas.group_ops import PandasGroupedOpsMixin
-from pyspark.sql.types import StructType, StructField, IntegerType, StringType
if TYPE_CHECKING:
from pyspark.sql._typing import LiteralType
@@ -112,20 +111,53 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- >>> gdf = df.groupBy(df.name)
- >>> sorted(gdf.agg({"*": "count"}).collect())
- [Row(name='Alice', count(1)=1), Row(name='Bob', count(1)=1)]
-
>>> from pyspark.sql import functions as F
- >>> sorted(gdf.agg(F.min(df.age)).collect())
- [Row(name='Alice', min(age)=2), Row(name='Bob', min(age)=5)]
-
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+ >>> df = spark.createDataFrame(
+ ... [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+ >>> df.show()
+ +---+-----+
+ |age| name|
+ +---+-----+
+ | 2|Alice|
+ | 3|Alice|
+ | 5| Bob|
+ | 10| Bob|
+ +---+-----+
+
+ Group-by name, and count each group.
+
+ >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+ +-----+--------+
+ | name|count(1)|
+ +-----+--------+
+ |Alice| 2|
+ | Bob| 2|
+ +-----+--------+
+
+ Group-by name, and calculate the minimum age.
+
+ >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+ +-----+--------+
+ | name|min(age)|
+ +-----+--------+
+ |Alice| 2|
+ | Bob| 5|
+ +-----+--------+
+
+ Same as above but uses pandas UDF.
+
>>> @pandas_udf('int', PandasUDFType.GROUPED_AGG) # doctest: +SKIP
... def min_udf(v):
... return v.min()
- >>> sorted(gdf.agg(min_udf(df.age)).collect()) # doctest: +SKIP
- [Row(name='Alice', min_udf(age)=2), Row(name='Bob', min_udf(age)=5)]
+ ...
+ >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show() # doctest: +SKIP
+ +-----+------------+
+ | name|min_udf(age)|
+ +-----+------------+
+ |Alice| 2|
+ | Bob| 5|
+ +-----+------------+
"""
assert exprs, "exprs should not be empty"
if len(exprs) == 1 and isinstance(exprs[0], dict):
@@ -145,8 +177,27 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- >>> sorted(df.groupBy(df.age).count().collect())
- [Row(age=2, count=1), Row(age=5, count=1)]
+ >>> df = spark.createDataFrame(
+ ... [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+ >>> df.show()
+ +---+-----+
+ |age| name|
+ +---+-----+
+ | 2|Alice|
+ | 3|Alice|
+ | 5| Bob|
+ | 10| Bob|
+ +---+-----+
+
+ Group-by name, and count each group.
+
+ >>> df.groupBy(df.name).count().sort("name").show()
+ +-----+-----+
+ | name|count|
+ +-----+-----+
+ |Alice| 2|
+ | Bob| 2|
+ +-----+-----+
"""
@df_varargs_api
@@ -161,13 +212,6 @@ class GroupedData(PandasGroupedOpsMixin):
----------
cols : str
column names. Non-numeric columns are ignored.
-
- Examples
- --------
- >>> df.groupBy().mean('age').collect()
- [Row(avg(age)=3.5)]
- >>> df3.groupBy().mean('age', 'height').collect()
- [Row(avg(age)=3.5, avg(height)=82.5)]
"""
@df_varargs_api
@@ -185,10 +229,37 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- >>> df.groupBy().avg('age').collect()
- [Row(avg(age)=3.5)]
- >>> df3.groupBy().avg('age', 'height').collect()
- [Row(avg(age)=3.5, avg(height)=82.5)]
+ >>> df = spark.createDataFrame([
+ ... (2, "Alice", 80), (3, "Alice", 100),
+ ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"])
+ >>> df.show()
+ +---+-----+------+
+ |age| name|height|
+ +---+-----+------+
+ | 2|Alice| 80|
+ | 3|Alice| 100|
+ | 5| Bob| 120|
+ | 10| Bob| 140|
+ +---+-----+------+
+
+ Group-by name, and calculate the mean of the age in each group.
+
+ >>> df.groupBy("name").avg('age').sort("name").show()
+ +-----+--------+
+ | name|avg(age)|
+ +-----+--------+
+ |Alice| 2.5|
+ | Bob| 7.5|
+ +-----+--------+
+
+ Calculate the mean of the age and height in all data.
+
+ >>> df.groupBy().avg('age', 'height').show()
+ +--------+-----------+
+ |avg(age)|avg(height)|
+ +--------+-----------+
+ | 5.0| 110.0|
+ +--------+-----------+
"""
@df_varargs_api
@@ -199,10 +270,37 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- >>> df.groupBy().max('age').collect()
- [Row(max(age)=5)]
- >>> df3.groupBy().max('age', 'height').collect()
- [Row(max(age)=5, max(height)=85)]
+ >>> df = spark.createDataFrame([
+ ... (2, "Alice", 80), (3, "Alice", 100),
+ ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"])
+ >>> df.show()
+ +---+-----+------+
+ |age| name|height|
+ +---+-----+------+
+ | 2|Alice| 80|
+ | 3|Alice| 100|
+ | 5| Bob| 120|
+ | 10| Bob| 140|
+ +---+-----+------+
+
+ Group-by name, and calculate the max of the age in each group.
+
+ >>> df.groupBy("name").max("age").sort("name").show()
+ +-----+--------+
+ | name|max(age)|
+ +-----+--------+
+ |Alice| 3|
+ | Bob| 10|
+ +-----+--------+
+
+ Calculate the max of the age and height in all data.
+
+ >>> df.groupBy().max("age", "height").show()
+ +--------+-----------+
+ |max(age)|max(height)|
+ +--------+-----------+
+ | 10| 140|
+ +--------+-----------+
"""
@df_varargs_api
@@ -218,10 +316,37 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- >>> df.groupBy().min('age').collect()
- [Row(min(age)=2)]
- >>> df3.groupBy().min('age', 'height').collect()
- [Row(min(age)=2, min(height)=80)]
+ >>> df = spark.createDataFrame([
+ ... (2, "Alice", 80), (3, "Alice", 100),
+ ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"])
+ >>> df.show()
+ +---+-----+------+
+ |age| name|height|
+ +---+-----+------+
+ | 2|Alice| 80|
+ | 3|Alice| 100|
+ | 5| Bob| 120|
+ | 10| Bob| 140|
+ +---+-----+------+
+
+ Group-by name, and calculate the min of the age in each group.
+
+ >>> df.groupBy("name").min("age").sort("name").show()
+ +-----+--------+
+ | name|min(age)|
+ +-----+--------+
+ |Alice| 2|
+ | Bob| 5|
+ +-----+--------+
+
+ Calculate the min of the age and height in all data.
+
+ >>> df.groupBy().min("age", "height").show()
+ +--------+-----------+
+ |min(age)|min(height)|
+ +--------+-----------+
+ | 2| 80|
+ +--------+-----------+
"""
@df_varargs_api
@@ -237,10 +362,37 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- >>> df.groupBy().sum('age').collect()
- [Row(sum(age)=7)]
- >>> df3.groupBy().sum('age', 'height').collect()
- [Row(sum(age)=7, sum(height)=165)]
+ >>> df = spark.createDataFrame([
+ ... (2, "Alice", 80), (3, "Alice", 100),
+ ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"])
+ >>> df.show()
+ +---+-----+------+
+ |age| name|height|
+ +---+-----+------+
+ | 2|Alice| 80|
+ | 3|Alice| 100|
+ | 5| Bob| 120|
+ | 10| Bob| 140|
+ +---+-----+------+
+
+ Group-by name, and calculate the sum of the age in each group.
+
+ >>> df.groupBy("name").sum("age").sort("name").show()
+ +-----+--------+
+ | name|sum(age)|
+ +-----+--------+
+ |Alice| 5|
+ | Bob| 15|
+ +-----+--------+
+
+ Calculate the sum of the age and height in all data.
+
+ >>> df.groupBy().sum("age", "height").show()
+ +--------+-----------+
+ |sum(age)|sum(height)|
+ +--------+-----------+
+ | 20| 440|
+ +--------+-----------+
"""
def pivot(self, pivot_col: str, values: Optional[List["LiteralType"]] = None) -> "GroupedData":
@@ -261,17 +413,69 @@ class GroupedData(PandasGroupedOpsMixin):
Examples
--------
- # Compute the sum of earnings for each year by course with each course as a separate column
-
- >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect()
- [Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
-
- # Or without specifying column values (less efficient)
-
- >>> df4.groupBy("year").pivot("course").sum("earnings").collect()
- [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
- >>> df5.groupBy("sales.year").pivot("sales.course").sum("sales.earnings").collect()
- [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
+ >>> from pyspark.sql import Row
+ >>> spark = SparkSession.builder.master("local[4]").appName("sql.group tests").getOrCreate()
+ >>> df1 = spark.createDataFrame([
+ ... Row(course="dotNET", year=2012, earnings=10000),
+ ... Row(course="Java", year=2012, earnings=20000),
+ ... Row(course="dotNET", year=2012, earnings=5000),
+ ... Row(course="dotNET", year=2013, earnings=48000),
+ ... Row(course="Java", year=2013, earnings=30000),
+ ... ])
+ >>> df1.show()
+ +------+----+--------+
+ |course|year|earnings|
+ +------+----+--------+
+ |dotNET|2012| 10000|
+ | Java|2012| 20000|
+ |dotNET|2012| 5000|
+ |dotNET|2013| 48000|
+ | Java|2013| 30000|
+ +------+----+--------+
+ >>> df2 = spark.createDataFrame([
+ ... Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=10000)),
+ ... Row(training="junior", sales=Row(course="Java", year=2012, earnings=20000)),
+ ... Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=5000)),
+ ... Row(training="junior", sales=Row(course="dotNET", year=2013, earnings=48000)),
+ ... Row(training="expert", sales=Row(course="Java", year=2013, earnings=30000)),
+ ... ])
+ >>> df2.show()
+ +--------+--------------------+
+ |training| sales|
+ +--------+--------------------+
+ | expert|{dotNET, 2012, 10...|
+ | junior| {Java, 2012, 20000}|
+ | expert|{dotNET, 2012, 5000}|
+ | junior|{dotNET, 2013, 48...|
+ | expert| {Java, 2013, 30000}|
+ +--------+--------------------+
+
+ Compute the sum of earnings for each year by course with each course as a separate column
+
+ >>> df1.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()
+ +----+------+-----+
+ |year|dotNET| Java|
+ +----+------+-----+
+ |2012| 15000|20000|
+ |2013| 48000|30000|
+ +----+------+-----+
+
+ Or without specifying column values (less efficient)
+
+ >>> df1.groupBy("year").pivot("course").sum("earnings").show()
+ +----+-----+------+
+ |year| Java|dotNET|
+ +----+-----+------+
+ |2012|20000| 15000|
+ |2013|30000| 48000|
+ +----+-----+------+
+ >>> df2.groupBy("sales.year").pivot("sales.course").sum("sales.earnings").show()
+ +----+-----+------+
+ |year| Java|dotNET|
+ +----+-----+------+
+ |2012|20000| 15000|
+ |2013|30000| 48000|
+ +----+-----+------+
"""
if values is None:
jgd = self._jgd.pivot(pivot_col)
@@ -282,7 +486,7 @@ class GroupedData(PandasGroupedOpsMixin):
def _test() -> None:
import doctest
- from pyspark.sql import Row, SparkSession
+ from pyspark.sql import SparkSession
import pyspark.sql.group
globs = pyspark.sql.group.__dict__.copy()
@@ -290,30 +494,6 @@ def _test() -> None:
sc = spark.sparkContext
globs["sc"] = sc
globs["spark"] = spark
- globs["df"] = sc.parallelize([(2, "Alice"), (5, "Bob")]).toDF(
- StructType([StructField("age", IntegerType()), StructField("name", StringType())])
- )
- globs["df3"] = sc.parallelize(
- [Row(name="Alice", age=2, height=80), Row(name="Bob", age=5, height=85)]
- ).toDF()
- globs["df4"] = sc.parallelize(
- [
- Row(course="dotNET", year=2012, earnings=10000),
- Row(course="Java", year=2012, earnings=20000),
- Row(course="dotNET", year=2012, earnings=5000),
- Row(course="dotNET", year=2013, earnings=48000),
- Row(course="Java", year=2013, earnings=30000),
- ]
- ).toDF()
- globs["df5"] = sc.parallelize(
- [
- Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=10000)),
- Row(training="junior", sales=Row(course="Java", year=2012, earnings=20000)),
- Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=5000)),
- Row(training="junior", sales=Row(course="dotNET", year=2013, earnings=48000)),
- Row(training="expert", sales=Row(course="Java", year=2013, earnings=30000)),
- ]
- ).toDF()
(failure_count, test_count) = doctest.testmod(
pyspark.sql.group,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org