You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/30 01:06:42 UTC

[spark] branch master updated: [SPARK-40012][PYTHON][DOCS] Make pyspark.sql.dataframe examples self-contained

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d5c13757e73 [SPARK-40012][PYTHON][DOCS] Make pyspark.sql.dataframe examples self-contained
d5c13757e73 is described below

commit d5c13757e7396b332ffc19253452ee2f0e61f85e
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Tue Aug 30 10:06:25 2022 +0900

    [SPARK-40012][PYTHON][DOCS] Make pyspark.sql.dataframe examples self-contained
    
    ### What changes were proposed in this pull request?
    
    This PR takes https://github.com/apache/spark/pull/37444 over with covering all examples in `pyspark.sql.dataframe`.
    
    This PR proposes to improve the examples in `pyspark.sql.dataframe` by making each example self-contained with more realistic examples.
    
    Closes #37444
    
    ### Why are the changes needed?
    
    To make the documentation more readable and able to copy and paste directly in PySpark shell.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Documentation changes only
    
    ### How was this patch tested?
    
    Manually ran each examples.
    
    Closes #37702 from HyukjinKwon/SPARK-40012.
    
    Lead-authored-by: Hyukjin Kwon <gu...@apache.org>
    Co-authored-by: William Zijie <pe...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/dataframe.py | 1372 +++++++++++++++++++++++++--------------
 1 file changed, 901 insertions(+), 471 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 41214bcfdbf..23dfd4e7ec8 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -41,7 +41,7 @@ from typing import (
 
 from py4j.java_gateway import JavaObject
 
-from pyspark import copy_func, since, _NoValue
+from pyspark import copy_func, _NoValue
 from pyspark._globals import _NoValueType
 from pyspark.context import SparkContext
 from pyspark.rdd import (
@@ -57,9 +57,6 @@ from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
 from pyspark.sql.streaming import DataStreamWriter
 from pyspark.sql.types import (
     StructType,
-    StructField,
-    StringType,
-    IntegerType,
     Row,
     _parse_datatype_json_string,
 )
@@ -82,31 +79,50 @@ __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
 class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
     """A distributed collection of data grouped into named columns.
 
+    .. versionadded:: 1.3.0
+
+    Examples
+    --------
     A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
-    and can be created using various functions in :class:`SparkSession`::
+    and can be created using various functions in :class:`SparkSession`:
 
-        people = spark.read.parquet("...")
+    >>> people = spark.createDataFrame([
+    ...     {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
+    ...     {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
+    ...     {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
+    ...     {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
+    ... ])
 
     Once created, it can be manipulated using the various domain-specific-language
     (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
 
-    To select a column from the :class:`DataFrame`, use the apply method::
-
-        ageCol = people.age
-
-    A more concrete example::
-
-        # To create DataFrame using SparkSession
-        people = spark.read.parquet("...")
-        department = spark.read.parquet("...")
-
-        people.filter(people.age > 30).join(department, people.deptId == department.id) \\
-          .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
-
-    .. versionadded:: 1.3.0
-
-    .. note: A DataFrame should only be created as described above. It should not be directly
-        created via using the constructor.
+    To select a column from the :class:`DataFrame`, use the apply method:
+
+    >>> age_col = people.age
+
+    A more concrete example:
+
+    >>> # To create DataFrame using SparkSession
+    ... department = spark.createDataFrame([
+    ...     {"id": 1, "name": "PySpark"},
+    ...     {"id": 2, "name": "ML"},
+    ...     {"id": 3, "name": "Spark SQL"}
+    ... ])
+
+    >>> people.filter(people.age > 30).join(
+    ...     department, people.deptId == department.id).groupBy(
+    ...     department.name, "gender").agg({"salary": "avg", "age": "max"}).show()
+    +-------+------+-----------+--------+
+    |   name|gender|avg(salary)|max(age)|
+    +-------+------+-----------+--------+
+    |     ML|     F|      150.0|      60|
+    |PySpark|     M|       75.0|      50|
+    +-------+------+-----------+--------+
+
+    Notes
+    -----
+    A DataFrame should only be created as described above. It should not be directly
+    created via using the constructor.
     """
 
     def __init__(
@@ -171,8 +187,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         """
         return self._session
 
-    @property  # type: ignore[misc]
-    @since(1.3)
+    @property
     def rdd(self) -> "RDD[Row]":
         """Returns the content as an :class:`pyspark.RDD` of :class:`Row`.
 
@@ -195,8 +210,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
             )
         return self._lazy_rdd
 
-    @property  # type: ignore[misc]
-    @since("1.3.1")
+    @property
     def na(self) -> "DataFrameNaFunctions":
         """Returns a :class:`DataFrameNaFunctions` for handling missing values.
 
@@ -208,9 +222,12 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df = spark.sql("select 1 as c1, int(null) as c2")
+        >>> df = spark.sql("SELECT 1 AS c1, int(NULL) AS c2")
         >>> type(df.na)
         <class 'pyspark.sql.dataframe.DataFrameNaFunctions'>
+
+        Replace the missing values as 2.
+
         >>> df.na.fill(2).show()
         +---+---+
         | c1| c2|
@@ -220,8 +237,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         """
         return DataFrameNaFunctions(self)
 
-    @property  # type: ignore[misc]
-    @since(1.4)
+    @property
     def stat(self) -> "DataFrameStatFunctions":
         """Returns a :class:`DataFrameStatFunctions` for statistic functions.
 
@@ -234,7 +250,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Examples
         --------
         >>> import pyspark.sql.functions as f
-        >>> df = spark.range(3).withColumn("c", f.expr("id+1"))
+        >>> df = spark.range(3).withColumn("c", f.expr("id + 1"))
         >>> type(df.stat)
         <class 'pyspark.sql.dataframe.DataFrameStatFunctions'>
         >>> df.stat.corr("id", "c")
@@ -251,7 +267,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Parameters
         ----------
-        use_unicode : bool, optional (default: True)
+        use_unicode : bool, optional, default True
             Whether to convert to unicode or not.
 
         Returns
@@ -260,6 +276,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.toJSON().first()
         '{"age":2,"name":"Alice"}'
         """
@@ -277,10 +294,16 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         .. deprecated:: 2.0.0
             Use :meth:`DataFrame.createOrReplaceTempView` instead.
 
+        Parameters
+        ----------
+        name : str
+            Name of the temporary table to register.
+
         Examples
         --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.registerTempTable("people")
-        >>> df2 = spark.sql("select * from people")
+        >>> df2 = spark.sql("SELECT * FROM people")
         >>> sorted(df.collect()) == sorted(df2.collect())
         True
         >>> spark.catalog.dropTempView("people")
@@ -305,20 +328,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         name : str
             Name of the view.
 
-        Returns
-        -------
-        None
-
         Examples
         --------
+        Create a local temporary view.
+
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.createTempView("people")
-        >>> df2 = spark.sql("select * from people")
+        >>> df2 = spark.sql("SELECT * FROM people")
         >>> sorted(df.collect()) == sorted(df2.collect())
         True
+
+        Throw an exception if the table already exists.
+
         >>> df.createTempView("people")  # doctest: +IGNORE_EXCEPTION_DETAIL
         Traceback (most recent call last):
         ...
-        AnalysisException: u"Temporary table 'people' already exists;"
+        AnalysisException: "Temporary table 'people' already exists;"
         >>> spark.catalog.dropTempView("people")
         True
 
@@ -344,10 +369,16 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        Create a local temporary view named 'people'.
+
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.createOrReplaceTempView("people")
+
+        Replace the local temporary view.
+
         >>> df2 = df.filter(df.age > 3)
         >>> df2.createOrReplaceTempView("people")
-        >>> df3 = spark.sql("select * from people")
+        >>> df3 = spark.sql("SELECT * FROM people")
         >>> sorted(df3.collect()) == sorted(df2.collect())
         True
         >>> spark.catalog.dropTempView("people")
@@ -376,14 +407,20 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        Create a global temporary view.
+
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.createGlobalTempView("people")
-        >>> df2 = spark.sql("select * from global_temp.people")
+        >>> df2 = spark.sql("SELECT * FROM global_temp.people")
         >>> sorted(df.collect()) == sorted(df2.collect())
         True
+
+        Throws an exception if the global temporary view already exists.
+
         >>> df.createGlobalTempView("people")  # doctest: +IGNORE_EXCEPTION_DETAIL
         Traceback (most recent call last):
         ...
-        AnalysisException: u"Temporary table 'people' already exists;"
+        AnalysisException: "Temporary table 'people' already exists;"
         >>> spark.catalog.dropGlobalTempView("people")
         True
 
@@ -408,10 +445,16 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        Create a global temporary view.
+
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.createOrReplaceGlobalTempView("people")
+
+        Replace the global temporary view.
+
         >>> df2 = df.filter(df.age > 3)
         >>> df2.createOrReplaceGlobalTempView("people")
-        >>> df3 = spark.sql("select * from global_temp.people")
+        >>> df3 = spark.sql("SELECT * FROM global_temp.people")
         >>> sorted(df3.collect()) == sorted(df2.collect())
         True
         >>> spark.catalog.dropGlobalTempView("people")
@@ -434,9 +477,15 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> type(df.write)
         <class 'pyspark.sql.readwriter.DataFrameWriter'>
+
+        Write the DataFrame as a table.
+
+        >>> _ = spark.sql("DROP TABLE IF EXISTS tab2")
         >>> df.write.saveAsTable("tab2")
+        >>> _ = spark.sql("DROP TABLE tab2")
         """
         return DataFrameWriter(self)
 
@@ -458,9 +507,15 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> import tempfile
         >>> df = spark.readStream.format("rate").load()
-        >>> dsw = df.writeStream
-        >>> dsw.option("checkpointLocation", "/tmp/c").toTable("tab3")  # doctest: +ELLIPSIS
+        >>> type(df.writeStream)
+        <class 'pyspark.sql.streaming.readwriter.DataStreamWriter'>
+
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     # Create a table with Rate source.
+        ...     df.writeStream.toTable(
+        ...         "my_table", checkpointLocation=d) # doctest: +ELLIPSIS
         <pyspark.sql.streaming.query.StreamingQuery object at 0x...>
         """
         return DataStreamWriter(self)
@@ -477,8 +532,13 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Retrieve the schema of the current DataFrame.
+
         >>> df.schema
-        StructType([StructField('age', IntegerType(), True),
+        StructType([StructField('age', LongType(), True),
                     StructField('name', StringType(), True)])
         """
         if self._schema is None:
@@ -501,9 +561,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> df.printSchema()
         root
-         |-- age: integer (nullable = true)
+         |-- age: long (nullable = true)
          |-- name: string (nullable = true)
         """
         print(self._jdf.schema().treeString())
@@ -534,15 +596,18 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
             .. versionchanged:: 3.0.0
                Added optional argument `mode` to specify the expected output format of plans.
 
-        Returns
-        -------
-        None
-
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Print out the physical plan only (default).
+
         >>> df.explain()
         == Physical Plan ==
-        *(1) Scan ExistingRDD[age#0,name#1]
+        *(1) Scan ExistingRDD[age...,name...]
+
+        Print out all of parsed, analyzed, optimized and physical plans.
 
         >>> df.explain(True)
         == Parsed Logical Plan ==
@@ -554,13 +619,17 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         == Physical Plan ==
         ...
 
+        Print out the plans with two sections: a physical plan outline and node details
+
         >>> df.explain(mode="formatted")
         == Physical Plan ==
-        * Scan ExistingRDD (1)
-        (1) Scan ExistingRDD [codegen id : 1]
-        Output [2]: [age#0, name#1]
+        * Scan ExistingRDD (...)
+        (1) Scan ExistingRDD [codegen id : ...]
+        Output [2]: [age..., name...]
         ...
 
+        Print a logical plan and statistics if they are available.
+
         >>> df.explain("cost")
         == Optimized Logical Plan ==
         ...Statistics...
@@ -628,7 +697,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         >>> df1 = spark.createDataFrame(
         ...         [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b",  3), ("c", 4)], ["C1", "C2"])
         >>> df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
-
         >>> df1.exceptAll(df2).show()
         +---+---+
         | C1| C2|
@@ -642,11 +710,12 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         """
         return DataFrame(self._jdf.exceptAll(other._jdf), self.sparkSession)
 
-    @since(1.3)
     def isLocal(self) -> bool:
         """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
         (without any Spark executors).
 
+        .. versionadded:: 1.3.0
+
         Returns
         -------
         bool
@@ -700,7 +769,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Examples
         --------
         >>> df_empty = spark.createDataFrame([], 'a STRING')
-        >>> df_non_empty = spark.createDataFrame([("a")], 'STRING')
+        >>> df_non_empty = spark.createDataFrame(["a"], 'STRING')
         >>> df_empty.isEmpty()
         True
         >>> df_non_empty.isEmpty()
@@ -731,29 +800,43 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df
-        DataFrame[age: int, name: string]
-        >>> df.show()
+        >>> df = spark.createDataFrame([
+        ...     (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Show only top 2 rows.
+
+        >>> df.show(2)
         +---+-----+
         |age| name|
         +---+-----+
-        |  2|Alice|
-        |  5|  Bob|
+        | 14|  Tom|
+        | 23|Alice|
         +---+-----+
+        only showing top 2 rows
+
+        Show :class:`DataFrame` where the maximum number of characters is 3.
+
         >>> df.show(truncate=3)
         +---+----+
         |age|name|
         +---+----+
-        |  2| Ali|
-        |  5| Bob|
+        | 14| Tom|
+        | 23| Ali|
+        | 16| Bob|
         +---+----+
+
+        Show :class:`DataFrame` vertically.
+
         >>> df.show(vertical=True)
         -RECORD 0-----
-         age  | 2
-         name | Alice
+        age  | 14
+        name | Tom
         -RECORD 1-----
-         age  | 5
-         name | Bob
+        age  | 23
+        name | Alice
+        -RECORD 2-----
+        age  | 16
+        name | Bob
         """
 
         if not isinstance(n, int) or isinstance(n, bool):
@@ -832,23 +915,27 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Parameters
         ----------
-        eager : bool, optional
-            Whether to checkpoint this :class:`DataFrame` immediately  (default True)
+        eager : bool, optional, default True
+            Whether to checkpoint this :class:`DataFrame` immediately.
 
         Returns
         -------
         :class:`DataFrame`
             Checkpointed DataFrame.
 
-        Examples
-        --------
-        >>> spark.sparkContext.setCheckpointDir("/tmp/bb")
-        >>> df.checkpoint(False)
-        DataFrame[age: int, name: string]
-
         Notes
         -----
         This API is experimental.
+
+        Examples
+        --------
+        >>> import tempfile
+        >>> df = spark.createDataFrame([
+        ...     (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     spark.sparkContext.setCheckpointDir("/tmp/bb")
+        ...     df.checkpoint(False)
+        DataFrame[age: bigint, name: string]
         """
         jdf = self._jdf.checkpoint(eager)
         return DataFrame(jdf, self.sparkSession)
@@ -863,22 +950,24 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Parameters
         ----------
-        eager : bool, optional
-            Whether to checkpoint this :class:`DataFrame` immediately (default True)
+        eager : bool, optional, default True
+            Whether to checkpoint this :class:`DataFrame` immediately.
 
         Returns
         -------
         :class:`DataFrame`
             Checkpointed DataFrame.
 
-        Examples
-        --------
-        >>> df.localCheckpoint(False)
-        DataFrame[age: int, name: string]
-
         Notes
         -----
         This API is experimental.
+
+        Examples
+        --------
+        >>> df = spark.createDataFrame([
+        ...     (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> df.localCheckpoint(False)
+        DataFrame[age: bigint, name: string]
         """
         jdf = self._jdf.localCheckpoint(eager)
         return DataFrame(jdf, self.sparkSession)
@@ -915,17 +1004,33 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             Watermarked DataFrame
 
-        Examples
-        --------
-        >>> from pyspark.sql.functions import timestamp_seconds
-        >>> sdf.select(
-        ...    'name',
-        ...    timestamp_seconds(sdf.time).alias('time')).withWatermark('time', '10 minutes')
-        DataFrame[name: string, time: timestamp]
-
         Notes
         -----
+        This is a feature only for Structured Streaming.
+
         This API is evolving.
+
+        Examples
+        --------
+        >>> from pyspark.sql import Row
+        >>> from pyspark.sql.functions import timestamp_seconds
+        >>> df = spark.readStream.format("rate").load().selectExpr(
+        ...     "value % 5 AS value", "timestamp")
+        >>> df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
+        DataFrame[value: bigint, time: timestamp]
+
+        Group the data by window and value (0 - 4), and compute the count of each group.
+
+        >>> import time
+        >>> from pyspark.sql.functions import window
+        >>> query = (df
+        ...     .withWatermark("timestamp", "10 minutes")
+        ...     .groupBy(
+        ...         window(df.timestamp, "10 minutes", "5 minutes"),
+        ...         df.value)
+        ...     ).count().writeStream.outputMode("complete").format("console").start()
+        >>> time.sleep(3)
+        >>> query.stop()
         """
         if not eventTime or type(eventTime) is not str:
             raise TypeError("eventTime should be provided as a string")
@@ -955,12 +1060,21 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.join(df2.hint("broadcast"), "name").show()
-        +----+---+------+
-        |name|age|height|
-        +----+---+------+
-        | Bob|  5|    85|
-        +----+---+------+
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
+        >>> df.join(df2, "name").explain()
+        == Physical Plan ==
+        ...
+        ... +- SortMergeJoin ...
+        ...
+
+        Explicitly trigger the broadcast hashjoin by providing the hint in ``df2``.
+
+        >>> df.join(df2.hint("broadcast"), "name").explain()
+        == Physical Plan ==
+        ...
+        ... +- BroadcastHashJoin ...
+        ...
         """
         if len(parameters) == 1 and isinstance(parameters[0], list):
             parameters = parameters[0]  # type: ignore[assignment]
@@ -992,8 +1106,13 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Return the number of rows in the :class:`DataFrame`.
+
         >>> df.count()
-        2
+        3
         """
         return int(self._jdf.count())
 
@@ -1009,8 +1128,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> df.collect()
-        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
+        [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
         """
         with SCCallSiteSync(self._sc):
             sock_info = self._jdf.collectToPython()
@@ -1037,8 +1158,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> list(df.toLocalIterator())
-        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
+        [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
         """
         with SCCallSiteSync(self._sc):
             sock_info = self._jdf.toPythonIterator(prefetchPartitions)
@@ -1062,10 +1185,19 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.limit(1).collect()
-        [Row(age=2, name='Alice')]
-        >>> df.limit(0).collect()
-        []
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> df.limit(1).show()
+        +---+----+
+        |age|name|
+        +---+----+
+        | 14| Tom|
+        +---+----+
+        >>> df.limit(0).show()
+        +---+----+
+        |age|name|
+        +---+----+
+        +---+----+
         """
         jdf = self._jdf.limit(num)
         return DataFrame(jdf, self.sparkSession)
@@ -1088,8 +1220,13 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Return the first 2 rows of the :class:`DataFrame`.
+
         >>> df.take(2)
-        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
+        [Row(age=14, name='Tom'), Row(age=23, name='Alice')]
         """
         return self.limit(num).collect()
 
@@ -1115,8 +1252,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.tail(1)
-        [Row(age=5, name='Bob')]
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        >>> df.tail(2)
+        [Row(age=23, name='Alice'), Row(age=16, name='Bob')]
         """
         with SCCallSiteSync(self._sc):
             sock_info = self._jdf.tailToPython(num)
@@ -1135,15 +1275,13 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
             A function that accepts one parameter which will
             receive each row to process.
 
-        Returns
-        -------
-        None
-
         Examples
         --------
-        >>> def f(person):
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> def func(person):
         ...     print(person.name)
-        >>> df.foreach(f)
+        >>> df.foreach(func)
         """
         self.rdd.foreach(f)
 
@@ -1160,16 +1298,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
             A function that accepts one parameter which will receive
             each partition to process.
 
-        Returns
-        -------
-        None
-
         Examples
         --------
-        >>> def f(people):
-        ...     for person in people:
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> def func(itr):
+        ...     for person in itr:
         ...         print(person.name)
-        >>> df.foreachPartition(f)
+        >>> df.foreachPartition(func)
         """
         self.rdd.foreachPartition(f)  # type: ignore[arg-type]
 
@@ -1227,6 +1363,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         >>> df = spark.range(1)
         >>> df.persist()
         DataFrame[id: bigint]
+
+        Persists the data in the disk by specifying the storage level.
+
         >>> from pyspark.storagelevel import StorageLevel
         >>> df.persist(StorageLevel.DISK_ONLY)
         DataFrame[id: bigint]
@@ -1249,10 +1388,13 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.storageLevel
+        >>> df1 = spark.range(10)
+        >>> df1.storageLevel
         StorageLevel(False, False, False, False, 1)
-        >>> df.cache().storageLevel
+        >>> df1.cache().storageLevel
         StorageLevel(True, True, False, True, 1)
+
+        >>> df2 = spark.range(5)
         >>> df2.persist(StorageLevel.DISK_ONLY_2).storageLevel
         StorageLevel(True, False, False, False, 2)
         """
@@ -1331,6 +1473,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.range(10)
         >>> df.coalesce(1).rdd.getNumPartitions()
         1
         """
@@ -1362,7 +1505,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         cols : str or :class:`Column`
             partitioning columns.
 
-            .. versionchanged:: 1.6
+            .. versionchanged:: 1.6.0
                Added optional arguments to specify the partitioning columns. Also made numPartitions
                optional if partitioning columns are specified.
 
@@ -1373,40 +1516,23 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Repartition the data into 10 partitions.
+
         >>> df.repartition(10).rdd.getNumPartitions()
         10
-        >>> data = df.union(df).repartition("age")
-        >>> data.show()
-        +---+-----+
-        |age| name|
-        +---+-----+
-        |  2|Alice|
-        |  5|  Bob|
-        |  2|Alice|
-        |  5|  Bob|
-        +---+-----+
-        >>> data = data.repartition(7, "age")
-        >>> data.show()
-        +---+-----+
-        |age| name|
-        +---+-----+
-        |  2|Alice|
-        |  5|  Bob|
-        |  2|Alice|
-        |  5|  Bob|
-        +---+-----+
-        >>> data.rdd.getNumPartitions()
+
+        Repartition the data into 7 partitions by 'age' column.
+
+        >>> df.repartition(7, "age").rdd.getNumPartitions()
         7
-        >>> data = data.repartition(3, "name", "age")
-        >>> data.show()
-        +---+-----+
-        |age| name|
-        +---+-----+
-        |  5|  Bob|
-        |  5|  Bob|
-        |  2|Alice|
-        |  2|Alice|
-        +---+-----+
+
+        Repartition the data into 7 partitions by 'age' and 'name columns.
+
+        >>> df.repartition(3, "name", "age").rdd.getNumPartitions()
+        3
         """
         if isinstance(numPartitions, int):
             if len(cols) == 0:
@@ -1437,9 +1563,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
         resulting :class:`DataFrame` is range partitioned.
 
-        At least one partition-by expression must be specified.
-        When no explicit sort order is specified, "ascending nulls first" is assumed.
-
         .. versionadded:: 2.4.0
 
         Parameters
@@ -1458,6 +1581,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Notes
         -----
+        At least one partition-by expression must be specified.
+        When no explicit sort order is specified, "ascending nulls first" is assumed.
+
         Due to performance reasons this method uses sampling to estimate the ranges.
         Hence, the output may not be consistent, since sampling can return different values.
         The sample size can be controlled by the config
@@ -1465,25 +1591,15 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+
+        Repartition the data into 2 partitions by range in 'age' column.
+        For example, the first partition can have ``(14, "Tom")``, and the second
+        partition would have ``(16, "Bob")`` and ``(23, "Alice")``.
+
         >>> df.repartitionByRange(2, "age").rdd.getNumPartitions()
         2
-        >>> df.show()
-        +---+-----+
-        |age| name|
-        +---+-----+
-        |  2|Alice|
-        |  5|  Bob|
-        +---+-----+
-        >>> df.repartitionByRange(1, "age").rdd.getNumPartitions()
-        1
-        >>> data = df.repartitionByRange("age")
-        >>> df.show()
-        +---+-----+
-        |age| name|
-        +---+-----+
-        |  2|Alice|
-        |  5|  Bob|
-        +---+-----+
         """
         if isinstance(numPartitions, int):
             if len(cols) == 0:
@@ -1511,6 +1627,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (23, "Alice")], ["age", "name"])
+
+        Return the number of distinct rows in the :class:`DataFrame`
+
         >>> df.distinct().count()
         2
         """
@@ -1630,7 +1751,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         col : :class:`Column` or str
             column that defines strata
 
-            .. versionchanged:: 3.0
+            .. versionchanged:: 3.0.0
                Added sampling by a column of :class:`Column`
         fractions : dict
             sampling fraction for each stratum. If a stratum is not
@@ -1645,7 +1766,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Examples
         --------
         >>> from pyspark.sql.functions import col
-        >>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
+        >>> dataset = spark.range(0, 100).select((col("id") % 3).alias("key"))
         >>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
         >>> sampled.groupBy("key").count().orderBy("key").show()
         +---+-----+
@@ -1693,10 +1814,17 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> splits = df4.randomSplit([1.0, 2.0], 24)
+        >>> from pyspark.sql import Row
+        >>> df = spark.createDataFrame([
+        ...     Row(age=10, height=80, name="Alice"),
+        ...     Row(age=5, height=None, name="Bob"),
+        ...     Row(age=None, height=None, name="Tom"),
+        ...     Row(age=None, height=None, name=None),
+        ... ])
+
+        >>> splits = df.randomSplit([1.0, 2.0], 24)
         >>> splits[0].count()
         2
-
         >>> splits[1].count()
         2
         """
@@ -1722,8 +1850,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> df.dtypes
-        [('age', 'int'), ('name', 'string')]
+        [('age', 'bigint'), ('name', 'string')]
         """
         return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields]
 
@@ -1740,6 +1870,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> df.columns
         ['age', 'name']
         """
@@ -1783,24 +1915,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Returns a new :class:`DataFrame` where each row is reconciled to match the specified
         schema.
 
-        Notes
-        -----
-        1, Reorder columns and/or inner fields by name to match the specified schema.
-
-        2, Project away columns and/or inner fields that are not needed by the specified schema.
-        Missing columns and/or inner fields (present in the specified schema but not input
-        DataFrame) lead to failures.
-
-        3, Cast the columns and/or inner fields to match the data types in the specified schema,
-        if the types are compatible, e.g., numeric to numeric (error if overflows), but not string
-        to int.
-
-        4, Carry over the metadata from the specified schema, while the columns and/or inner fields
-        still keep their own metadata if not overwritten by the specified schema.
-
-        5, Fail if the nullability is not compatible. For example, the column and/or inner field
-        is nullable but the specified schema requires them to be not nullable.
-
         .. versionadded:: 3.4.0
 
         Parameters
@@ -1813,11 +1927,31 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             Reconciled DataFrame.
 
+        Notes
+        -----
+        * Reorder columns and/or inner fields by name to match the specified schema.
+
+        * Project away columns and/or inner fields that are not needed by the specified schema.
+            Missing columns and/or inner fields (present in the specified schema but not input
+            DataFrame) lead to failures.
+
+        * Cast the columns and/or inner fields to match the data types in the specified schema,
+            if the types are compatible, e.g., numeric to numeric (error if overflows), but
+            not string to int.
+
+        * Carry over the metadata from the specified schema, while the columns and/or inner fields
+            still keep their own metadata if not overwritten by the specified schema.
+
+        * Fail if the nullability is not compatible. For example, the column and/or inner field
+            is nullable but the specified schema requires them to be not nullable.
+
         Examples
         --------
+        >>> from pyspark.sql.types import StructField, StringType
         >>> df = spark.createDataFrame([("a", 1)], ["i", "j"])
         >>> df.schema
         StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])
+
         >>> schema = StructType([StructField("j", StringType()), StructField("i", StringType())])
         >>> df2 = df.to(schema)
         >>> df2.schema
@@ -1850,13 +1984,21 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> from pyspark.sql.functions import *
+        >>> from pyspark.sql.functions import col, desc
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> df_as1 = df.alias("df_as1")
         >>> df_as2 = df.alias("df_as2")
         >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
-        >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age") \
-                .sort(desc("df_as1.name")).collect()
-        [Row(name='Bob', name='Bob', age=5), Row(name='Alice', name='Alice', age=2)]
+        >>> joined_df.select(
+        ...     "df_as1.name", "df_as2.name", "df_as2.age").sort(desc("df_as1.name")).show()
+        +-----+-----+---+
+        | name| name|age|
+        +-----+-----+---+
+        |  Tom|  Tom| 14|
+        |  Bob|  Bob| 16|
+        |Alice|Alice| 23|
+        +-----+-----+---+
         """
         assert isinstance(alias, str), "alias should be a string"
         return DataFrame(getattr(self._jdf, "as")(alias), self.sparkSession)
@@ -1878,13 +2020,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.select("age", "name").collect()
-        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
-        >>> df2.select("name", "height").collect()
-        [Row(name='Tom', height=80), Row(name='Bob', height=85)]
-        >>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect()
-        [Row(age=2, name='Alice', height=80), Row(age=2, name='Alice', height=85),
-         Row(age=5, name='Bob', height=80), Row(age=5, name='Bob', height=85)]
+        >>> from pyspark.sql import Row
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> df2 = spark.createDataFrame(
+        ...     [Row(height=80, name="Tom"), Row(height=85, name="Bob")])
+        >>> df.crossJoin(df2.select("height")).select("age", "name", "height").show()
+        +---+-----+------+
+        |age| name|height|
+        +---+-----+------+
+        | 14|  Tom|    80|
+        | 14|  Tom|    85|
+        | 23|Alice|    80|
+        | 23|Alice|    85|
+        | 16|  Bob|    80|
+        | 16|  Bob|    85|
+        +---+-----+------+
         """
 
         jdf = self._jdf.crossJoin(other._jdf)
@@ -1924,23 +2075,66 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         --------
         The following performs a full outer join between ``df1`` and ``df2``.
 
+        >>> from pyspark.sql import Row
         >>> from pyspark.sql.functions import desc
-        >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height) \
-                .sort(desc("name")).collect()
-        [Row(name='Bob', height=85), Row(name='Alice', height=None), Row(name=None, height=80)]
-
-        >>> df.join(df2, 'name', 'outer').select('name', 'height').sort(desc("name")).collect()
-        [Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
-
-        >>> cond = [df.name == df3.name, df.age == df3.age]
-        >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
-        [Row(name='Alice', age=2), Row(name='Bob', age=5)]
-
-        >>> df.join(df2, 'name').select(df.name, df2.height).collect()
-        [Row(name='Bob', height=85)]
-
-        >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
-        [Row(name='Bob', age=5)]
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")]).toDF("age", "name")
+        >>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
+        >>> df3 = spark.createDataFrame([Row(age=2, name="Alice"), Row(age=5, name="Bob")])
+        >>> df4 = spark.createDataFrame([
+        ...     Row(age=10, height=80, name="Alice"),
+        ...     Row(age=5, height=None, name="Bob"),
+        ...     Row(age=None, height=None, name="Tom"),
+        ...     Row(age=None, height=None, name=None),
+        ... ])
+
+        Inner join on columns (default)
+
+        >>> df.join(df2, 'name').select(df.name, df2.height).show()
+        +----+------+
+        |name|height|
+        +----+------+
+        | Bob|    85|
+        +----+------+
+        >>> df.join(df4, ['name', 'age']).select(df.name, df.age).show()
+        +----+---+
+        |name|age|
+        +----+---+
+        | Bob|  5|
+        +----+---+
+
+        Outer join for both DataFrames on 'name' column.
+
+        >>> df.join(df2, df.name == df2.name, 'outer').select(
+        ...     df.name, df2.height).sort(desc("name")).show()
+        +-----+------+
+        | name|height|
+        +-----+------+
+        |  Bob|    85|
+        |Alice|  null|
+        | null|    80|
+        +-----+------+
+        >>> df.join(df2, 'name', 'outer').select('name', 'height').sort(desc("name")).show()
+        +-----+------+
+        | name|height|
+        +-----+------+
+        |  Tom|    80|
+        |  Bob|    85|
+        |Alice|  null|
+        +-----+------+
+
+        Outer join for both DataFrams with multiple columns.
+
+        >>> df.join(
+        ...     df3,
+        ...     [df.name == df3.name, df.age == df3.age],
+        ...     'outer'
+        ... ).select(df.name, df3.age).show()
+        +-----+---+
+        | name|age|
+        +-----+---+
+        |Alice|  2|
+        |  Bob|  5|
+        +-----+---+
         """
 
         if on is not None and not isinstance(on, list):
@@ -1984,8 +2178,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         This is similar to a left-join except that we match on nearest
         key rather than equal keys.
 
-        .. versionadded:: 3.3.0
-
         Parameters
         ----------
         other : :class:`DataFrame`
@@ -2100,8 +2292,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Other Parameters
         ----------------
-        ascending : bool or list, optional
-            boolean or list of boolean (default ``True``).
+        ascending : bool or list, optional, default True
+            boolean or list of boolean.
             Sort ascending vs. descending. Specify list for multiple sort orders.
             If a list is specified, length of the list must equal length of the `cols`.
 
@@ -2112,13 +2304,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.sortWithinPartitions("age", ascending=False).show()
-        +---+-----+
-        |age| name|
-        +---+-----+
-        |  2|Alice|
-        |  5|  Bob|
-        +---+-----+
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df.sortWithinPartitions("age", ascending=False)
+        DataFrame[age: bigint, name: string]
         """
         jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs))
         return DataFrame(jdf, self.sparkSession)
@@ -2137,8 +2325,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Other Parameters
         ----------------
-        ascending : bool or list, optional
-            boolean or list of boolean (default ``True``).
+        ascending : bool or list, optional, default True
+            boolean or list of boolean.
             Sort ascending vs. descending. Specify list for multiple sort orders.
             If a list is specified, length of the list must equal length of the `cols`.
 
@@ -2149,19 +2337,67 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.sort(df.age.desc()).collect()
-        [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
-        >>> df.sort("age", ascending=False).collect()
-        [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
-        >>> df.orderBy(df.age.desc()).collect()
-        [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
-        >>> from pyspark.sql.functions import *
-        >>> df.sort(asc("age")).collect()
-        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
-        >>> df.orderBy(desc("age"), "name").collect()
-        [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
-        >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
-        [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
+        >>> from pyspark.sql.functions import desc, asc
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
+
+        Sort the DataFrame in ascending order.
+
+        >>> df.sort(asc("age")).show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  5|  Bob|
+        +---+-----+
+
+        Sort the DataFrame in descending order.
+
+        >>> df.sort(df.age.desc()).show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  5|  Bob|
+        |  2|Alice|
+        +---+-----+
+        >>> df.orderBy(df.age.desc()).show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  5|  Bob|
+        |  2|Alice|
+        +---+-----+
+        >>> df.sort("age", ascending=False).show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  5|  Bob|
+        |  2|Alice|
+        +---+-----+
+
+        Specify miltiple columns
+
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (2, "Bob"), (5, "Bob")], schema=["age", "name"])
+        >>> df.orderBy(desc("age"), "name").show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  5|  Bob|
+        |  2|Alice|
+        |  2|  Bob|
+        +---+-----+
+
+        Specify miltiple columns for sorting order at `ascending`.
+
+        >>> df.orderBy(["age", "name"], ascending=[False, False]).show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  5|  Bob|
+        |  2|  Bob|
+        |  2|Alice|
+        +---+-----+
         """
         jdf = self._jdf.sort(self._sort_cols(cols, kwargs))
         return DataFrame(jdf, self.sparkSession)
@@ -2285,12 +2521,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         .. versionadded:: 2.3.0
 
-        Notes
-        -----
-        This function is meant for exploratory data analysis, as we make no
-        guarantee about the backward compatibility of the schema of the resulting
-        :class:`DataFrame`.
-
         Parameters
         ----------
         statistics : str, optional
@@ -2301,6 +2531,12 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             A new DataFrame that provides statistics for the given DataFrame.
 
+        Notes
+        -----
+        This function is meant for exploratory data analysis, as we make no
+        guarantee about the backward compatibility of the schema of the resulting
+        :class:`DataFrame`.
+
         Examples
         --------
         >>> df = spark.createDataFrame(
@@ -2371,6 +2607,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.head()
         Row(age=2, name='Alice')
         >>> df.head(1)
@@ -2388,10 +2626,13 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Returns
         -------
-        First Row if DataFrame is not empty, otherwise None.
+        :class:`Row`
+            First row if :class:`DataFrame` is not empty, otherwise ``None``.
 
         Examples
         --------
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.first()
         Row(age=2, name='Alice')
         """
@@ -2412,14 +2653,40 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.select(df['age']).collect()
-        [Row(age=2), Row(age=5)]
-        >>> df[ ["name", "age"]].collect()
-        [Row(name='Alice', age=2), Row(name='Bob', age=5)]
-        >>> df[ df.age > 3 ].collect()
-        [Row(age=5, name='Bob')]
-        >>> df[df[0] > 3].collect()
-        [Row(age=5, name='Bob')]
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
+
+        Retrieve a column instance.
+
+        >>> df.select(df['age']).show()
+        +---+
+        |age|
+        +---+
+        |  2|
+        |  5|
+        +---+
+
+        Select multiple string columns as index.
+
+        >>> df[["name", "age"]].show()
+        +-----+---+
+        | name|age|
+        +-----+---+
+        |Alice|  2|
+        |  Bob|  5|
+        +-----+---+
+        >>> df[df.age > 3].show()
+        +---+----+
+        |age|name|
+        +---+----+
+        |  5| Bob|
+        +---+----+
+        >>> df[df[0] > 3].show()
+        +---+----+
+        |age|name|
+        +---+----+
+        |  5| Bob|
+        +---+----+
         """
         if isinstance(item, str):
             jc = self._jdf.apply(item)
@@ -2451,10 +2718,18 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.select(df.age).collect()
-        [Row(age=2), Row(age=5)]
-        >>> df["age"]
-        Column<'age'>
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
+
+        Retrieve a column instance.
+
+        >>> df.select(df.age).show()
+        +---+
+        |age|
+        +---+
+        |  2|
+        |  5|
+        +---+
         """
         if name not in self.columns:
             raise AttributeError(
@@ -2490,12 +2765,28 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.select('*').collect()
-        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
-        >>> df.select('name', 'age').collect()
-        [Row(name='Alice', age=2), Row(name='Bob', age=5)]
-        >>> df.select(df.name, (df.age + 10).alias('age')).collect()
-        [Row(name='Alice', age=12), Row(name='Bob', age=15)]
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
+
+        Select all columns in the DataFrame.
+
+        >>> df.select('*').show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  5|  Bob|
+        +---+-----+
+
+        Select a column with other expressions in the DataFrame.
+
+        >>> df.select(df.name, (df.age + 10).alias('age')).show()
+        +-----+---+
+        | name|age|
+        +-----+---+
+        |Alice| 12|
+        |  Bob| 15|
+        +-----+---+
         """
         jdf = self._jdf.select(self._jcols(*cols))
         return DataFrame(jdf, self.sparkSession)
@@ -2522,8 +2813,15 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.selectExpr("age * 2", "abs(age)").collect()
-        [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df.selectExpr("age * 2", "abs(age)").show()
+        +---------+--------+
+        |(age * 2)|abs(age)|
+        +---------+--------+
+        |        4|       2|
+        |       10|       5|
+        +---------+--------+
         """
         if len(expr) == 1 and isinstance(expr[0], list):
             expr = expr[0]  # type: ignore[assignment]
@@ -2550,15 +2848,38 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.filter(df.age > 3).collect()
-        [Row(age=5, name='Bob')]
-        >>> df.where(df.age == 2).collect()
-        [Row(age=2, name='Alice')]
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (5, "Bob")], schema=["age", "name"])
 
-        >>> df.filter("age > 3").collect()
-        [Row(age=5, name='Bob')]
-        >>> df.where("age = 2").collect()
-        [Row(age=2, name='Alice')]
+        Filter by :class:`Column` instances.
+
+        >>> df.filter(df.age > 3).show()
+        +---+----+
+        |age|name|
+        +---+----+
+        |  5| Bob|
+        +---+----+
+        >>> df.where(df.age == 2).show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        +---+-----+
+
+        Filter by SQL expression in a string.
+
+        >>> df.filter("age > 3").show()
+        +---+----+
+        |age|name|
+        +---+----+
+        |  5| Bob|
+        +---+----+
+        >>> df.where("age = 2").show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        +---+-----+
         """
         if isinstance(condition, str):
             jdf = self._jdf.filter(condition)
@@ -2599,14 +2920,48 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.groupBy().avg().collect()
-        [Row(avg(age)=3.5)]
-        >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
-        [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
-        >>> sorted(df.groupBy(df.name).avg().collect())
-        [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
-        >>> sorted(df.groupBy(['name', df.age]).count().collect())
-        [Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)]
+        >>> df = spark.createDataFrame([
+        ...     (2, "Alice"), (2, "Bob"), (2, "Bob"), (5, "Bob")], schema=["age", "name"])
+
+        Empty grouping columns triggers a global aggregation.
+
+        >>> df.groupBy().avg().show()
+        +--------+
+        |avg(age)|
+        +--------+
+        |    2.75|
+        +--------+
+
+        Group-by 'name', and specify a dictionary to calculate the summation of 'age'.
+
+        >>> df.groupBy("name").agg({"age": "sum"}).sort("name").show()
+        +-----+--------+
+        | name|sum(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       9|
+        +-----+--------+
+
+        Group-by 'name', and calculate maximum values.
+
+        >>> df.groupBy(df.name).max().sort("name").show()
+        +-----+--------+
+        | name|max(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Group-by 'name' and 'age', and calculate the number of rows in each group.
+
+        >>> df.groupBy(["name", df.age]).count().sort("name", "age").show()
+        +-----+---+-----+
+        | name|age|count|
+        +-----+---+-----+
+        |Alice|  2|    1|
+        |  Bob|  2|    2|
+        |  Bob|  5|    1|
+        +-----+---+-----+
         """
         jgd = self._jdf.groupBy(self._jcols(*cols))
         from pyspark.sql.group import GroupedData
@@ -2642,6 +2997,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.rollup("name", df.age).count().orderBy("name", "age").show()
         +-----+----+-----+
         | name| age|count|
@@ -2687,6 +3043,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df.cube("name", df.age).count().orderBy("name", "age").show()
         +-----+----+-----+
         | name| age|count|
@@ -2859,11 +3216,20 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.agg({"age": "max"}).collect()
-        [Row(max(age)=5)]
         >>> from pyspark.sql import functions as F
-        >>> df.agg(F.min(df.age)).collect()
-        [Row(min(age)=2)]
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df.agg({"age": "max"}).show()
+        +--------+
+        |max(age)|
+        +--------+
+        |       5|
+        +--------+
+        >>> df.agg(F.min(df.age)).show()
+        +--------+
+        |min(age)|
+        +--------+
+        |       2|
+        +--------+
         """
         return self.groupBy().agg(*exprs)  # type: ignore[arg-type]
 
@@ -2922,6 +3288,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         >>> from pyspark.sql.functions import col, count, lit, max
         >>> from pyspark.sql import Observation
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> observation = Observation("my metrics")
         >>> observed_df = df.observe(observation, count(lit(1)).alias("count"), max(col("age")))
         >>> observed_df.count()
@@ -2976,21 +3343,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         else:
             raise ValueError("'observation' should be either `Observation` or `str`.")
 
-    @since(2.0)
     def union(self, other: "DataFrame") -> "DataFrame":
         """Return a new :class:`DataFrame` containing union of rows in this and another
         :class:`DataFrame`.
 
-        This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
-        (that does deduplication of elements), use this function followed by :func:`distinct`.
-
-        Also as standard in SQL, this function resolves columns by position (not by name).
-
-        .. versionadded:: 2.0
-
-        See Also
-        --------
-        DataFrame.unionAll
+        .. versionadded:: 2.0.0
 
         Parameters
         ----------
@@ -3001,6 +3358,17 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         -------
         :class:`DataFrame`
 
+        See Also
+        --------
+        DataFrame.unionAll
+
+        Notes
+        -----
+        This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
+        (that does deduplication of elements), use this function followed by :func:`distinct`.
+
+        Also as standard in SQL, this function resolves columns by position (not by name).
+
         Examples
         --------
         >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
@@ -3019,28 +3387,15 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         |   1|   2|   3|
         |   1|   2|   3|
         +----+----+----+
-
         """
         return DataFrame(self._jdf.union(other._jdf), self.sparkSession)
 
-    @since(1.3)
     def unionAll(self, other: "DataFrame") -> "DataFrame":
         """Return a new :class:`DataFrame` containing union of rows in this and another
         :class:`DataFrame`.
 
-        This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
-        (that does deduplication of elements), use this function followed by :func:`distinct`.
-
-        Also as standard in SQL, this function resolves columns by position (not by name).
-
-        :func:`unionAll` is an alias to :func:`union`
-
         .. versionadded:: 1.3.0
 
-        See Also
-        --------
-        DataFrame.union
-
         Parameters
         ----------
         other : :class:`DataFrame`
@@ -3051,6 +3406,18 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             Combined DataFrame
 
+        Notes
+        -----
+        This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
+        (that does deduplication of elements), use this function followed by :func:`distinct`.
+
+        Also as standard in SQL, this function resolves columns by position (not by name).
+
+        :func:`unionAll` is an alias to :func:`union`
+
+        See Also
+        --------
+        DataFrame.union
         """
         return self.union(other)
 
@@ -3067,6 +3434,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         ----------
         other : :class:`DataFrame`
             Another :class:`DataFrame` that needs to be combined.
+        allowMissingColumns : bool, optional, default False
+           Specify whether to allow missing columns.
+
+           .. versionadded:: 3.1.0
 
         Returns
         -------
@@ -3102,21 +3473,16 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         |   1|   2|   3|null|
         |null|   4|   5|   6|
         +----+----+----+----+
-
-        .. versionchanged:: 3.1.0
-           Added optional argument `allowMissingColumns` to specify whether to allow
-           missing columns.
         """
         return DataFrame(self._jdf.unionByName(other._jdf, allowMissingColumns), self.sparkSession)
 
-    @since(1.3)
     def intersect(self, other: "DataFrame") -> "DataFrame":
         """Return a new :class:`DataFrame` containing rows only in
         both this :class:`DataFrame` and another :class:`DataFrame`.
         Note that any duplicates are removed. To preserve duplicates
         use :func:`intersectAll`.
 
-        This is equivalent to `INTERSECT` in SQL.
+        .. versionadded:: 1.3.0
 
         Parameters
         ----------
@@ -3128,6 +3494,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             Combined DataFrame.
 
+        Notes
+        -----
+        This is equivalent to `INTERSECT` in SQL.
+
         Examples
         --------
         >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
@@ -3165,7 +3535,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         --------
         >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
         >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
-
         >>> df1.intersectAll(df2).sort("C1", "C2").show()
         +---+---+
         | C1| C2|
@@ -3177,12 +3546,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         """
         return DataFrame(self._jdf.intersectAll(other._jdf), self.sparkSession)
 
-    @since(1.3)
     def subtract(self, other: "DataFrame") -> "DataFrame":
         """Return a new :class:`DataFrame` containing rows in this :class:`DataFrame`
         but not in another :class:`DataFrame`.
 
-        This is equivalent to `EXCEPT DISTINCT` in SQL.
+        .. versionadded:: 1.3.0
 
         Parameters
         ----------
@@ -3194,11 +3562,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             Subtracted DataFrame.
 
+        Notes
+        -----
+        This is equivalent to `EXCEPT DISTINCT` in SQL.
+
         Examples
         --------
         >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
         >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
-
         >>> df1.subtract(df2).show()
         +---+---+
         | C1| C2|
@@ -3235,10 +3606,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Examples
         --------
         >>> from pyspark.sql import Row
-        >>> df = sc.parallelize([ \\
-        ...     Row(name='Alice', age=5, height=80), \\
-        ...     Row(name='Alice', age=5, height=80), \\
-        ...     Row(name='Alice', age=10, height=80)]).toDF()
+        >>> df = spark.createDataFrame([
+        ...     Row(name='Alice', age=5, height=80),
+        ...     Row(name='Alice', age=5, height=80),
+        ...     Row(name='Alice', age=10, height=80)
+        ... ])
+
+        Deduplicate the same rows.
+
         >>> df.dropDuplicates().show()
         +-----+---+------+
         | name|age|height|
@@ -3247,6 +3622,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         |Alice| 10|    80|
         +-----+---+------+
 
+        Deduplicate values on 'name' and 'height' columns.
+
         >>> df.dropDuplicates(['name', 'height']).show()
         +-----+---+------+
         | name|age|height|
@@ -3294,7 +3671,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df4.na.drop().show()
+        >>> from pyspark.sql import Row
+        >>> df = spark.createDataFrame([
+        ...     Row(age=10, height=80, name="Alice"),
+        ...     Row(age=5, height=None, name="Bob"),
+        ...     Row(age=None, height=None, name="Tom"),
+        ...     Row(age=None, height=None, name=None),
+        ... ])
+        >>> df.na.drop().show()
         +---+------+-----+
         |age|height| name|
         +---+------+-----+
@@ -3358,34 +3742,48 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df4.na.fill(50).show()
-        +---+------+-----+
-        |age|height| name|
-        +---+------+-----+
-        | 10|    80|Alice|
-        |  5|    50|  Bob|
-        | 50|    50|  Tom|
-        | 50|    50| null|
-        +---+------+-----+
-
-        >>> df5.na.fill(False).show()
-        +----+-------+-----+
-        | age|   name|  spy|
-        +----+-------+-----+
-        |  10|  Alice|false|
-        |   5|    Bob|false|
-        |null|Mallory| true|
-        +----+-------+-----+
-
-        >>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
-        +---+------+-------+
-        |age|height|   name|
-        +---+------+-------+
-        | 10|    80|  Alice|
-        |  5|  null|    Bob|
-        | 50|  null|    Tom|
-        | 50|  null|unknown|
-        +---+------+-------+
+        >>> df = spark.createDataFrame([
+        ...     (10, 80.5, "Alice", None),
+        ...     (5, None, "Bob", None),
+        ... 	(None, None, "Tom", None),
+        ...     (None, None, None, True)],
+        ...     schema=["age", "height", "name", "bool"])
+
+        Fill all null values with 50 for numeric columns.
+
+        >>> df.na.fill(50).show()
+        +---+------+-----+----+
+        |age|height| name|bool|
+        +---+------+-----+----+
+        | 10|  80.5|Alice|null|
+        |  5|  50.0|  Bob|null|
+        | 50|  50.0|  Tom|null|
+        | 50|  50.0| null|true|
+        +---+------+-----+----+
+
+        Fill all null values with ``False`` for boolean columns.
+
+        >>> df.na.fill(False).show()
+        +----+------+-----+-----+
+        | age|height| name| bool|
+        +----+------+-----+-----+
+        |  10|  80.5|Alice|false|
+        |   5|  null|  Bob|false|
+        |null|  null|  Tom|false|
+        |null|  null| null| true|
+        +----+------+-----+-----+
+
+        Fill all null values with to 50 and "unknown" for 'age' and 'name' column respectively.
+
+        >>> df.na.fill({'age': 50, 'name': 'unknown'}).show()
+        +---+------+-------+----+
+        |age|height|   name|bool|
+        +---+------+-------+----+
+        | 10|  80.5|  Alice|null|
+        |  5|  null|    Bob|null|
+        | 50|  null|    Tom|null|
+        | 50|  null|unknown|true|
+        +---+------+-------+----+
         """
         if not isinstance(value, (float, int, str, bool, dict)):
             raise TypeError("value should be a float, int, string, bool or dict")
@@ -3489,43 +3887,46 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df4.na.replace(10, 20).show()
+        >>> df = spark.createDataFrame([
+        ...     (10, 80, "Alice"),
+        ...     (5, None, "Bob"),
+        ...     (None, 10, "Tom"),
+        ...     (None, None, None)],
+        ...     schema=["age", "height", "name"])
+
+        Replace 10 to 20 in all columns.
+
+        >>> df.na.replace(10, 20).show()
         +----+------+-----+
         | age|height| name|
         +----+------+-----+
         |  20|    80|Alice|
         |   5|  null|  Bob|
-        |null|  null|  Tom|
+        |null|    20|  Tom|
         |null|  null| null|
         +----+------+-----+
 
-        >>> df4.na.replace('Alice', None).show()
-        +----+------+----+
-        | age|height|name|
-        +----+------+----+
-        |  10|    80|null|
-        |   5|  null| Bob|
-        |null|  null| Tom|
-        |null|  null|null|
-        +----+------+----+
+        Replace 'Alice' to null in all columns.
 
-        >>> df4.na.replace({'Alice': None}).show()
+        >>> df.na.replace('Alice', None).show()
         +----+------+----+
         | age|height|name|
         +----+------+----+
         |  10|    80|null|
         |   5|  null| Bob|
-        |null|  null| Tom|
+        |null|    10| Tom|
         |null|  null|null|
         +----+------+----+
 
-        >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+        Replace 'Alice' to 'A', and 'Bob' to 'B' in the 'name' column.
+
+        >>> df.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
         +----+------+----+
         | age|height|name|
         +----+------+----+
         |  10|    80|   A|
         |   5|  null|   B|
-        |null|  null| Tom|
+        |null|    10| Tom|
         |null|  null|null|
         +----+------+----+
         """
@@ -3661,9 +4062,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Space-efficient Online Computation of Quantile Summaries]]
         by Greenwald and Khanna.
 
-        Note that null values will be ignored in numerical columns before calculation.
-        For columns only containing null values, an empty list is returned.
-
         .. versionadded:: 2.0.0
 
         Parameters
@@ -3671,7 +4069,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         col: str, tuple or list
             Can be a single column name, or a list of names for multiple columns.
 
-            .. versionchanged:: 2.2
+            .. versionchanged:: 2.2.0
                Added support for multiple columns.
         probabilities : list or tuple
             a list of quantile probabilities
@@ -3686,11 +4084,18 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         Returns
         -------
         list
-            the approximate quantiles at the given probabilities. If
-            the input `col` is a string, the output is a list of floats. If the
-            input `col` is a list or tuple of strings, the output is also a
-            list, but each element in it is a list of floats, i.e., the output
-            is a list of list of floats.
+            the approximate quantiles at the given probabilities.
+
+            * If the input `col` is a string, the output is a list of floats.
+
+            * If the input `col` is a list or tuple of strings, the output is also a
+                list, but each element in it is a list of floats, i.e., the output
+                is a list of list of floats.
+
+        Notes
+        -----
+        Null values will be ignored in numerical columns before calculation.
+        For columns only containing null values, an empty list is returned.
         """
 
         if not isinstance(col, (str, list, tuple)):
@@ -3877,6 +4282,12 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         :class:`DataFrame`
             DataFrame with frequent items.
 
+        Notes
+        -----
+        This function is meant for exploratory data analysis, as we make no
+        guarantee about the backward compatibility of the schema of the resulting
+        :class:`DataFrame`.
+
         Examples
         --------
         >>> df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
@@ -3886,12 +4297,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         +------------+------------+
         |   [4, 1, 3]| [8, 11, 10]|
         +------------+------------+
-
-        Notes
-        -----
-        This function is meant for exploratory data analysis, as we make no
-        guarantee about the backward compatibility of the schema of the resulting
-        :class:`DataFrame`.
         """
         if isinstance(cols, tuple):
             cols = list(cols)
@@ -3926,8 +4331,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).collect()
-        [Row(age=2, name='Alice', age2=4, age3=5), Row(age=5, name='Bob', age2=7, age3=8)]
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+        +---+-----+----+----+
+        |age| name|age2|age3|
+        +---+-----+----+----+
+        |  2|Alice|   4|   5|
+        |  5|  Bob|   7|   8|
+        +---+-----+----+----+
         """
         # Below code is to help enable kwargs in future.
         assert len(colsMap) == 1
@@ -3975,9 +4386,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.withColumn('age2', df.age + 2).collect()
-        [Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]
-
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df.withColumn('age2', df.age + 2).show()
+        +---+-----+----+
+        |age| name|age2|
+        +---+-----+----+
+        |  2|Alice|   4|
+        |  5|  Bob|   7|
+        +---+-----+----+
         """
         if not isinstance(col, Column):
             raise TypeError("col should be Column")
@@ -4003,8 +4419,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.withColumnRenamed('age', 'age2').collect()
-        [Row(age2=2, name='Alice'), Row(age2=5, name='Bob')]
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
+        >>> df.withColumnRenamed('age', 'age2').show()
+        +----+-----+
+        |age2| name|
+        +----+-----+
+        |   2|Alice|
+        |   5|  Bob|
+        +----+-----+
         """
         return DataFrame(self._jdf.withColumnRenamed(existing, new), self.sparkSession)
 
@@ -4027,6 +4449,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
         >>> df_meta = df.withMetadata('age', {'foo': 'bar'})
         >>> df_meta.schema['age'].metadata
         {'foo': 'bar'}
@@ -4064,20 +4487,37 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.drop('age').collect()
-        [Row(name='Alice'), Row(name='Bob')]
-
-        >>> df.drop(df.age).collect()
-        [Row(name='Alice'), Row(name='Bob')]
-
-        >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
-        [Row(age=5, height=85, name='Bob')]
-
-        >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
-        [Row(age=5, name='Bob', height=85)]
-
-        >>> df.join(df2, 'name', 'inner').drop('age', 'height').collect()
-        [Row(name='Bob')]
+        >>> from pyspark.sql import Row
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+        >>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
+
+        >>> df.drop('age').show()
+        +-----+
+        | name|
+        +-----+
+        |  Tom|
+        |Alice|
+        |  Bob|
+        +-----+
+        >>> df.drop(df.age).show()
+        +-----+
+        | name|
+        +-----+
+        |  Tom|
+        |Alice|
+        |  Bob|
+        +-----+
+
+        Drop the column that joined both DataFrames on.
+
+        >>> df.join(df2, df.name == df2.name, 'inner').drop('name').show()
+        +---+------+
+        |age|height|
+        +---+------+
+        | 16|    85|
+        | 14|    80|
+        +---+------+
         """
         if len(cols) == 1:
             col = cols[0]
@@ -4099,8 +4539,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Parameters
         ----------
-        cols : str
-            new column names
+        *cols : tuple
+            a tuple of string new column name or :class:`Column`. The length of the
+            list needs to be the same as the number of columns in the initial
+            :class:`DataFrame`
 
         Returns
         -------
@@ -4109,8 +4551,16 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.toDF('f1', 'f2').collect()
-        [Row(f1=2, f2='Alice'), Row(f1=5, f2='Bob')]
+        >>> df = spark.createDataFrame([(14, "Tom"), (23, "Alice"),
+        ...     (16, "Bob")], ["age", "name"])
+        >>> df.toDF('f1', 'f2').show()
+        +---+-----+
+        | f1|   f2|
+        +---+-----+
+        | 14|  Tom|
+        | 23|Alice|
+        | 16|  Bob|
+        +---+-----+
         """
         jdf = self._jdf.toDF(self._jseq(cols))
         return DataFrame(jdf, self.sparkSession)
@@ -4153,6 +4603,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         |    1|  1|
         |    2|  2|
         +-----+---+
+
         >>> def add_n(input_df, n):
         ...     return input_df.select([(col(col_name) + n).alias(col_name)
         ...                             for col_name in input_df.columns])
@@ -4256,8 +4707,18 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df = spark.read.load("examples/src/main/resources/people.json", format="json")
-        >>> len(df.inputFiles())
+        >>> import tempfile
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     # Write a single-row DataFrame into a JSON file
+        ...     spark.createDataFrame(
+        ...         [{"age": 100, "name": "Hyukjin Kwon"}]
+        ...     ).repartition(1).write.json(d, mode="overwrite")
+        ...
+        ...     # Read the JSON file as a DataFrame.
+        ...     df = spark.read.format("json").load(d)
+        ...
+        ...     # Returns the number of input files.
+        ...     len(df.inputFiles())
         1
         """
         return list(self._jdf.inputFiles())
@@ -4301,6 +4762,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> df.writeTo("catalog.db.table").append()  # doctest: +SKIP
         >>> df.writeTo(                              # doctest: +SKIP
         ...     "catalog.db.table"
@@ -4345,29 +4808,23 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
-        >>> df.show()  # doctest: +SKIP
-        +----+----+
-        |Col1|Col2|
-        +----+----+
-        |   a|   1|
-        |   b|   2|
-        |   c|   3|
-        +----+----+
+        >>> df = spark.createDataFrame(
+        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
 
         >>> df.pandas_api()  # doctest: +SKIP
-          Col1  Col2
-        0    a     1
-        1    b     2
-        2    c     3
+           age   name
+        0   14    Tom
+        1   23  Alice
+        2   16    Bob
 
         We can specify the index columns.
 
-        >>> df.pandas_api(index_col="Col1"): # doctest: +SKIP
-              Col2
-        Col1
-        a        1
-        b        2
-        c        3
+        >>> df.pandas_api(index_col="age")  # doctest: +SKIP
+              name
+        age
+        14     Tom
+        23   Alice
+        16     Bob
         """
         from pyspark.pandas.namespace import _get_index_map
         from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
@@ -4538,45 +4995,18 @@ class DataFrameStatFunctions:
 
 def _test() -> None:
     import doctest
-    from pyspark.context import SparkContext
-    from pyspark.sql import Row, SQLContext, SparkSession
+    from pyspark.sql import SparkSession
     import pyspark.sql.dataframe
 
     globs = pyspark.sql.dataframe.__dict__.copy()
-    sc = SparkContext("local[4]", "PythonTest")
-    globs["sc"] = sc
-    globs["sqlContext"] = SQLContext(sc)
-    globs["spark"] = SparkSession(sc)
-    globs["df"] = sc.parallelize([(2, "Alice"), (5, "Bob")]).toDF(
-        StructType([StructField("age", IntegerType()), StructField("name", StringType())])
-    )
-    globs["df2"] = sc.parallelize([Row(height=80, name="Tom"), Row(height=85, name="Bob")]).toDF()
-    globs["df3"] = sc.parallelize([Row(age=2, name="Alice"), Row(age=5, name="Bob")]).toDF()
-    globs["df4"] = sc.parallelize(
-        [
-            Row(age=10, height=80, name="Alice"),
-            Row(age=5, height=None, name="Bob"),
-            Row(age=None, height=None, name="Tom"),
-            Row(age=None, height=None, name=None),
-        ]
-    ).toDF()
-    globs["df5"] = sc.parallelize(
-        [
-            Row(age=10, name="Alice", spy=False),
-            Row(age=5, name="Bob", spy=None),
-            Row(age=None, name="Mallory", spy=True),
-        ]
-    ).toDF()
-    globs["sdf"] = sc.parallelize(
-        [Row(name="Tom", time=1479441846), Row(name="Bob", time=1479442946)]
-    ).toDF()
-
+    spark = SparkSession.builder.master("local[4]").appName("sql.dataframe tests").getOrCreate()
+    globs["spark"] = spark
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.dataframe,
         globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF,
     )
-    globs["sc"].stop()
+    spark.stop()
     if failure_count:
         sys.exit(-1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org