You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/21 00:47:51 UTC
[spark] branch master updated: [SPARK-40147][PYTHON][SQL] Make pyspark.sql.session examples self-contained
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dfa1c6f0868 [SPARK-40147][PYTHON][SQL] Make pyspark.sql.session examples self-contained
dfa1c6f0868 is described below
commit dfa1c6f0868a459c5a01cae20e6a0660017896d0
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Sun Aug 21 09:47:36 2022 +0900
[SPARK-40147][PYTHON][SQL] Make pyspark.sql.session examples self-contained
### What changes were proposed in this pull request?
This PR proposes to improve the examples in `pyspark.sql.session` by making each example self-contained with a brief explanation and a bit more realistic example.
### Why are the changes needed?
To make the documentation more readable and able to copy and paste directly in PySpark shell.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the documentation
### How was this patch tested?
Manually ran each doctests. CI also runs this.
Closes #37582 from HyukjinKwon/SPARK-40147.
Lead-authored-by: Hyukjin Kwon <gu...@apache.org>
Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../source/reference/pyspark.sql/core_classes.rst | 2 +
python/docs/source/reference/pyspark.sql/index.rst | 1 +
.../pyspark.sql/{core_classes.rst => udf.rst} | 25 +-
python/pyspark/sql/session.py | 425 ++++++++++++++++-----
4 files changed, 342 insertions(+), 111 deletions(-)
diff --git a/python/docs/source/reference/pyspark.sql/core_classes.rst b/python/docs/source/reference/pyspark.sql/core_classes.rst
index 72f9ca122a9..bc4df5087f5 100644
--- a/python/docs/source/reference/pyspark.sql/core_classes.rst
+++ b/python/docs/source/reference/pyspark.sql/core_classes.rst
@@ -37,3 +37,5 @@ Core Classes
Window
DataFrameReader
DataFrameWriter
+ UDFRegistration
+ udf.UserDefinedFunction
diff --git a/python/docs/source/reference/pyspark.sql/index.rst b/python/docs/source/reference/pyspark.sql/index.rst
index 6a0c59b184c..52aca086cb4 100644
--- a/python/docs/source/reference/pyspark.sql/index.rst
+++ b/python/docs/source/reference/pyspark.sql/index.rst
@@ -39,3 +39,4 @@ This page gives an overview of all public Spark SQL API.
catalog
avro
observation
+ udf
diff --git a/python/docs/source/reference/pyspark.sql/core_classes.rst b/python/docs/source/reference/pyspark.sql/udf.rst
similarity index 77%
copy from python/docs/source/reference/pyspark.sql/core_classes.rst
copy to python/docs/source/reference/pyspark.sql/udf.rst
index 72f9ca122a9..a3d27c48427 100644
--- a/python/docs/source/reference/pyspark.sql/core_classes.rst
+++ b/python/docs/source/reference/pyspark.sql/udf.rst
@@ -16,24 +16,17 @@
under the License.
-============
-Core Classes
-============
+===
+UDF
+===
+
.. currentmodule:: pyspark.sql
.. autosummary::
:toctree: api/
- SparkSession
- Catalog
- DataFrame
- Column
- Observation
- Row
- GroupedData
- PandasCogroupedOps
- DataFrameNaFunctions
- DataFrameStatFunctions
- Window
- DataFrameReader
- DataFrameWriter
+ udf.UserDefinedFunction.asNondeterministic
+ udf.UserDefinedFunction.returnType
+ UDFRegistration.register
+ UDFRegistration.registerJavaFunction
+ UDFRegistration.registerJavaUDAF
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index bf98e9743e2..0867104d61e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -39,7 +39,7 @@ from typing import (
from py4j.java_gateway import JavaObject
-from pyspark import SparkConf, SparkContext, since
+from pyspark import SparkConf, SparkContext
from pyspark.rdd import RDD
from pyspark.sql.conf import RuntimeConfig
from pyspark.sql.dataframe import DataFrame
@@ -99,8 +99,15 @@ def _monkey_patch_RDD(sparkSession: "SparkSession") -> None:
Examples
--------
- >>> rdd.toDF().collect()
- [Row(name='Alice', age=1)]
+ >>> rdd = spark.range(1).rdd.map(lambda x: tuple(x))
+ >>> rdd.collect()
+ [(0,)]
+ >>> rdd.toDF().show()
+ +---+
+ | _1|
+ +---+
+ | 0|
+ +---+
"""
return sparkSession.createDataFrame(self, schema, sampleRatio)
@@ -116,19 +123,18 @@ def _monkey_patch_RDD(sparkSession: "SparkSession") -> None:
class classproperty(property):
"""Same as Python's @property decorator, but for class attributes.
- Example:
-
+ Examples
+ --------
>>> class Builder:
- ...
... def build(self):
... return MyClass()
...
>>> class MyClass:
- ...
... @classproperty
... def builder(cls):
... print("instantiating new builder")
... return Builder()
+ ...
>>> c1 = MyClass.builder
instantiating new builder
>>> c2 = MyClass.builder
@@ -159,26 +165,20 @@ class SparkSession(SparkConversionMixin):
Examples
--------
- >>> spark = SparkSession.builder \\
- ... .master("local") \\
- ... .appName("Word Count") \\
- ... .config("spark.some.config.option", "some-value") \\
- ... .getOrCreate()
-
- >>> from datetime import datetime
- >>> from pyspark.sql import Row
+ Create a Spark session.
+
+ >>> spark = (
+ ... SparkSession.builder
+ ... .master("local")
+ ... .appName("Word Count")
+ ... .config("spark.some.config.option", "some-value")
+ ... .getOrCreate()
+ ... )
+
+ Create a Spark session from a Spark context.
+
+ >>> sc = spark.sparkContext
>>> spark = SparkSession(sc)
- >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1,
- ... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
- ... time=datetime(2014, 8, 1, 14, 1, 5))])
- >>> df = allTypes.toDF()
- >>> df.createOrReplaceTempView("allTypes")
- >>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
- ... 'from allTypes where b and i > 0').collect()
- [Row((i + 1)=2, (d + 1)=2.0, (NOT b)=False, list[1]=2, \
- dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
- >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
- [(1, 'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]
"""
class Builder:
@@ -217,19 +217,22 @@ class SparkSession(SparkConversionMixin):
conf : :class:`SparkConf`, optional
an instance of :class:`SparkConf`
+ Returns
+ -------
+ :class:`SparkSession.Builder`
+
Examples
--------
- For an existing SparkConf, use `conf` parameter.
+ For an existing class:`SparkConf`, use `conf` parameter.
>>> from pyspark.conf import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
- <pyspark.sql.session...
+ <pyspark.sql.session.SparkSession.Builder...
For a (key, value) pair, you can omit parameter names.
>>> SparkSession.builder.config("spark.some.config.option", "some-value")
- <pyspark.sql.session...
-
+ <pyspark.sql.session.SparkSession.Builder...
"""
with self._lock:
if conf is None:
@@ -250,6 +253,15 @@ class SparkSession(SparkConversionMixin):
----------
master : str
a url for spark master
+
+ Returns
+ -------
+ :class:`SparkSession.Builder`
+
+ Examples
+ --------
+ >>> SparkSession.builder.master("local")
+ <pyspark.sql.session.SparkSession.Builder...
"""
return self.config("spark.master", master)
@@ -264,13 +276,32 @@ class SparkSession(SparkConversionMixin):
----------
name : str
an application name
+
+ Returns
+ -------
+ :class:`SparkSession.Builder`
+
+ Examples
+ --------
+ >>> SparkSession.builder.appName("My app")
+ <pyspark.sql.session.SparkSession.Builder...
"""
return self.config("spark.app.name", name)
- @since(2.0)
def enableHiveSupport(self) -> "SparkSession.Builder":
"""Enables Hive support, including connectivity to a persistent Hive metastore, support
for Hive SerDes, and Hive user-defined functions.
+
+ .. versionadded:: 2.0.0
+
+ Returns
+ -------
+ :class:`SparkSession.Builder`
+
+ Examples
+ --------
+ >>> SparkSession.builder.enableHiveSupport()
+ <pyspark.sql.session.SparkSession.Builder...
"""
return self.config("spark.sql.catalogImplementation", "hive")
@@ -280,6 +311,11 @@ class SparkSession(SparkConversionMixin):
.. versionadded:: 2.0.0
+
+ Returns
+ -------
+ :class:`SparkSession`
+
Examples
--------
This method first checks whether there is a valid global default SparkSession, and if
@@ -401,12 +437,23 @@ class SparkSession(SparkConversionMixin):
"""Accessor for the JVM SQL-specific configurations"""
return self._jsparkSession.sessionState().conf()
- @since(2.0)
def newSession(self) -> "SparkSession":
"""
Returns a new :class:`SparkSession` as new session, that has separate SQLConf,
registered temporary views and UDFs, but shared :class:`SparkContext` and
table cache.
+
+ .. versionadded:: 2.0.0
+
+ Returns
+ -------
+ :class:`SparkSession`
+ Spark session if an active session exists for the current thread
+
+ Examples
+ --------
+ >>> spark.newSession()
+ <...SparkSession object ...>
"""
return self.__class__(self._sc, self._jsparkSession.newSession())
@@ -425,11 +472,13 @@ class SparkSession(SparkConversionMixin):
Examples
--------
>>> s = SparkSession.getActiveSession()
- >>> l = [('Alice', 1)]
- >>> rdd = s.sparkContext.parallelize(l)
- >>> df = s.createDataFrame(rdd, ['name', 'age'])
- >>> df.select("age").collect()
- [Row(age=1)]
+ >>> df = s.createDataFrame([('Alice', 1)], ['name', 'age'])
+ >>> df.select("age").show()
+ +---+
+ |age|
+ +---+
+ | 1|
+ +---+
"""
from pyspark import SparkContext
@@ -444,20 +493,49 @@ class SparkSession(SparkConversionMixin):
else:
return None
- @property # type: ignore[misc]
- @since(2.0)
+ @property
def sparkContext(self) -> SparkContext:
- """Returns the underlying :class:`SparkContext`."""
+ """
+ Returns the underlying :class:`SparkContext`.
+
+ .. versionadded:: 2.0.0
+
+ Returns
+ -------
+ :class:`SparkContext`
+
+ Examples
+ --------
+ >>> spark.sparkContext
+ <SparkContext master=... appName=...>
+
+ Create an RDD from the Spark context
+
+ >>> rdd = spark.sparkContext.parallelize([1, 2, 3])
+ >>> rdd.collect()
+ [1, 2, 3]
+ """
return self._sc
- @property # type: ignore[misc]
- @since(2.0)
+ @property
def version(self) -> str:
- """The version of Spark on which this application is running."""
+ """
+ The version of Spark on which this application is running.
+
+ .. versionadded:: 2.0.0
+
+ Returns
+ -------
+ str
+ the version of Spark in string.
+
+ Examples
+ --------
+ >>> _ = spark.version
+ """
return self._jsparkSession.version()
- @property # type: ignore[misc]
- @since(2.0)
+ @property
def conf(self) -> RuntimeConfig:
"""Runtime configuration interface for Spark.
@@ -465,9 +543,22 @@ class SparkSession(SparkConversionMixin):
configurations that are relevant to Spark SQL. When getting the value of a config,
this defaults to the value set in the underlying :class:`SparkContext`, if any.
+ .. versionadded:: 2.0.0
+
Returns
-------
:class:`pyspark.sql.conf.RuntimeConfig`
+
+ Examples
+ --------
+ >>> spark.conf
+ <pyspark.sql.conf.RuntimeConfig object ...>
+
+ Set a runtime configuration for the session
+
+ >>> spark.conf.set("key", "value")
+ >>> spark.conf.get("key")
+ 'value'
"""
if not hasattr(self, "_conf"):
self._conf = RuntimeConfig(self._jsparkSession.conf())
@@ -483,6 +574,18 @@ class SparkSession(SparkConversionMixin):
Returns
-------
:class:`Catalog`
+
+ Examples
+ --------
+ >>> spark.catalog
+ <pyspark.sql.catalog.Catalog object ...>
+
+ Create a temp view, show the list, and drop it.
+
+ >>> spark.range(1).createTempView("test_view")
+ >>> spark.catalog.listTables()
+ [Table(name='test_view', catalog=None, namespace=[], description=None, ...
+ >>> _ = spark.catalog.dropTempView("test_view")
"""
from pyspark.sql.catalog import Catalog
@@ -499,6 +602,21 @@ class SparkSession(SparkConversionMixin):
Returns
-------
:class:`UDFRegistration`
+
+ Examples
+ --------
+ >>> spark.udf
+ <pyspark.sql.udf.UDFRegistration object ...>
+
+ Register a Python UDF, and use it in SQL.
+
+ >>> strlen = spark.udf.register("strlen", lambda x: len(x))
+ >>> spark.sql("SELECT strlen('test')").show()
+ +------------+
+ |strlen(test)|
+ +------------+
+ | 4|
+ +------------+
"""
from pyspark.sql.udf import UDFRegistration
@@ -535,13 +653,25 @@ class SparkSession(SparkConversionMixin):
Examples
--------
- >>> spark.range(1, 7, 2).collect()
- [Row(id=1), Row(id=3), Row(id=5)]
+ >>> spark.range(1, 7, 2).show()
+ +---+
+ | id|
+ +---+
+ | 1|
+ | 3|
+ | 5|
+ +---+
If only one argument is specified, it will be used as the end value.
- >>> spark.range(3).collect()
- [Row(id=0), Row(id=1), Row(id=2)]
+ >>> spark.range(3).show()
+ +---+
+ | id|
+ +---+
+ | 0|
+ | 1|
+ | 2|
+ +---+
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
@@ -848,27 +978,8 @@ class SparkSession(SparkConversionMixin):
Creates a :class:`DataFrame` from an :class:`RDD`, a list, a :class:`pandas.DataFrame`
or a :class:`numpy.ndarray`.
- When ``schema`` is a list of column names, the type of each column
- will be inferred from ``data``.
-
- When ``schema`` is ``None``, it will try to infer the schema (column names and types)
- from ``data``, which should be an RDD of either :class:`Row`,
- :class:`namedtuple`, or :class:`dict`.
-
- When ``schema`` is :class:`pyspark.sql.types.DataType` or a datatype string, it must match
- the real data, or an exception will be thrown at runtime. If the given schema is not
- :class:`pyspark.sql.types.StructType`, it will be wrapped into a
- :class:`pyspark.sql.types.StructType` as its only field, and the field name will be "value".
- Each record will also be wrapped into a tuple, which can be converted to row later.
-
- If schema inference is needed, ``samplingRatio`` is used to determined the ratio of
- rows used for schema inference. The first row will be used if ``samplingRatio`` is ``None``.
-
.. versionadded:: 2.0.0
- .. versionchanged:: 2.1.0
- Added verifySchema.
-
Parameters
----------
data : :class:`RDD` or iterable
@@ -880,38 +991,62 @@ class SparkSession(SparkConversionMixin):
column names, default is None. The data type string format equals to
:class:`pyspark.sql.types.DataType.simpleString`, except that top level struct type can
omit the ``struct<>``.
+
+ When ``schema`` is a list of column names, the type of each column
+ will be inferred from ``data``.
+
+ When ``schema`` is ``None``, it will try to infer the schema (column names and types)
+ from ``data``, which should be an RDD of either :class:`Row`,
+ :class:`namedtuple`, or :class:`dict`.
+
+ When ``schema`` is :class:`pyspark.sql.types.DataType` or a datatype string, it must
+ match the real data, or an exception will be thrown at runtime. If the given schema is
+ not :class:`pyspark.sql.types.StructType`, it will be wrapped into a
+ :class:`pyspark.sql.types.StructType` as its only field, and the field name will be
+ "value". Each record will also be wrapped into a tuple, which can be converted to row
+ later.
samplingRatio : float, optional
- the sample ratio of rows used for inferring
+ the sample ratio of rows used for inferring. The first few rows will be used
+ if ``samplingRatio`` is ``None``.
verifySchema : bool, optional
verify data types of every row against schema. Enabled by default.
+ .. versionadded:: 2.1.0
+
Returns
-------
:class:`DataFrame`
Notes
-----
- Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental.
+ Usage with `spark.sql.execution.arrow.pyspark.enabled=True` is experimental.
Examples
--------
- >>> l = [('Alice', 1)]
- >>> spark.createDataFrame(l).collect()
+ Create a DataFrame from a list of tuples.
+
+ >>> spark.createDataFrame([('Alice', 1)]).collect()
[Row(_1='Alice', _2=1)]
- >>> spark.createDataFrame(l, ['name', 'age']).collect()
+ >>> spark.createDataFrame([('Alice', 1)], ['name', 'age']).collect()
[Row(name='Alice', age=1)]
+ Create a DataFrame from a list of dictionaries
+
>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name='Alice')]
- >>> rdd = sc.parallelize(l)
+ Create a DataFrame from an RDD.
+
+ >>> rdd = spark.sparkContext.parallelize([('Alice', 1)])
>>> spark.createDataFrame(rdd).collect()
[Row(_1='Alice', _2=1)]
>>> df = spark.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name='Alice', age=1)]
+ Create a DataFrame from Row instances.
+
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
@@ -919,6 +1054,8 @@ class SparkSession(SparkConversionMixin):
>>> df2.collect()
[Row(name='Alice', age=1)]
+ Create a DataFrame with the explicit schema specified.
+
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("name", StringType(), True),
@@ -927,16 +1064,23 @@ class SparkSession(SparkConversionMixin):
>>> df3.collect()
[Row(name='Alice', age=1)]
+ Create a DataFrame from a pandas DataFrame.
+
>>> spark.createDataFrame(df.toPandas()).collect() # doctest: +SKIP
[Row(name='Alice', age=1)]
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect() # doctest: +SKIP
[Row(0=1, 1=2)]
+ Create a DataFrame from an RDD with the schema in DDL formatted string.
+
>>> spark.createDataFrame(rdd, "a: string, b: int").collect()
[Row(a='Alice', b=1)]
>>> rdd = rdd.map(lambda row: row[1])
>>> spark.createDataFrame(rdd, "int").collect()
[Row(value=1)]
+
+ When the type is unmatched, it throws an exception.
+
>>> spark.createDataFrame(rdd, "boolean").collect() # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
@@ -1143,16 +1287,28 @@ class SparkSession(SparkConversionMixin):
.. versionadded:: 2.0.0
+ Parameters
+ ----------
+ tableName : str
+ the table name to retrieve.
+
Returns
-------
:class:`DataFrame`
Examples
--------
- >>> df.createOrReplaceTempView("table1")
- >>> df2 = spark.table("table1")
- >>> sorted(df.collect()) == sorted(df2.collect())
- True
+ >>> spark.range(5).createOrReplaceTempView("table1")
+ >>> spark.table("table1").sort("id").show()
+ +---+
+ | id|
+ +---+
+ | 0|
+ | 1|
+ | 2|
+ | 3|
+ | 4|
+ +---+
"""
return DataFrame(self._jsparkSession.table(tableName), self)
@@ -1167,6 +1323,28 @@ class SparkSession(SparkConversionMixin):
Returns
-------
:class:`DataFrameReader`
+
+ Examples
+ --------
+ >>> spark.read
+ <pyspark.sql.readwriter.DataFrameReader object ...>
+
+ Write a DataFrame into a JSON file and read it back.
+
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # Write a DataFrame into a JSON file
+ ... spark.createDataFrame(
+ ... [{"age": 100, "name": "Hyukjin Kwon"}]
+ ... ).write.mode("overwrite").format("json").save(d)
+ ...
+ ... # Read the JSON file as a DataFrame.
+ ... spark.read.format('json').load(d).show()
+ +---+------------+
+ |age| name|
+ +---+------------+
+ |100|Hyukjin Kwon|
+ +---+------------+
"""
return DataFrameReader(self)
@@ -1185,6 +1363,22 @@ class SparkSession(SparkConversionMixin):
Returns
-------
:class:`DataStreamReader`
+
+ Examples
+ --------
+ >>> spark.readStream
+ <pyspark.sql.streaming.readwriter.DataStreamReader object ...>
+
+ The example below uses Rate source that generates rows continously.
+ After that, we operate a modulo by 3, and then write the stream out to the console.
+ The streaming query stops in 3 seconds.
+
+ >>> import time
+ >>> df = spark.readStream.format("rate").load()
+ >>> df = df.selectExpr("value % 3 as v")
+ >>> q = df.writeStream.format("console").start()
+ >>> time.sleep(3)
+ >>> q.stop()
"""
return DataStreamReader(self)
@@ -1202,14 +1396,35 @@ class SparkSession(SparkConversionMixin):
Returns
-------
:class:`StreamingQueryManager`
+
+ Examples
+ --------
+ >>> spark.streams
+ <pyspark.sql.streaming.query.StreamingQueryManager object ...>
+
+ Get the list of active streaming queries
+
+ >>> sq = spark.readStream.format(
+ ... "rate").load().writeStream.format('memory').queryName('this_query').start()
+ >>> sqm = spark.streams
+ >>> [q.name for q in sqm.active]
+ ['this_query']
+ >>> sq.stop()
"""
from pyspark.sql.streaming import StreamingQueryManager
return StreamingQueryManager(self._jsparkSession.streams())
- @since(2.0)
def stop(self) -> None:
- """Stop the underlying :class:`SparkContext`."""
+ """
+ Stop the underlying :class:`SparkContext`.
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> spark.stop() # doctest: +SKIP
+ """
from pyspark.sql.context import SQLContext
self._sc.stop()
@@ -1221,14 +1436,28 @@ class SparkSession(SparkConversionMixin):
SparkSession._activeSession = None
SQLContext._instantiatedContext = None
- @since(2.0)
def __enter__(self) -> "SparkSession":
"""
Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax.
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> with SparkSession.builder.master("local").getOrCreate() as session:
+ ... session.range(5).show() # doctest: +SKIP
+ +---+
+ | id|
+ +---+
+ | 0|
+ | 1|
+ | 2|
+ | 3|
+ | 4|
+ +---+
"""
return self
- @since(2.0)
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
@@ -1239,6 +1468,22 @@ class SparkSession(SparkConversionMixin):
Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax.
Specifically stop the SparkSession on exit of the with block.
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> with SparkSession.builder.master("local").getOrCreate() as session:
+ ... session.range(5).show() # doctest: +SKIP
+ +---+
+ | id|
+ +---+
+ | 0|
+ | 1|
+ | 2|
+ | 3|
+ | 4|
+ +---+
"""
self.stop()
@@ -1246,30 +1491,20 @@ class SparkSession(SparkConversionMixin):
def _test() -> None:
import os
import doctest
- from pyspark.context import SparkContext
- from pyspark.sql import Row
import pyspark.sql.session
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.sql.session.__dict__.copy()
- sc = SparkContext("local[4]", "PythonTest")
- globs["sc"] = sc
- globs["spark"] = SparkSession(sc)
- globs["rdd"] = rdd = sc.parallelize(
- [
- Row(field1=1, field2="row1"),
- Row(field1=2, field2="row2"),
- Row(field1=3, field2="row3"),
- ]
+ globs["spark"] = (
+ SparkSession.builder.master("local[4]").appName("sql.session tests").getOrCreate()
)
- globs["df"] = rdd.toDF()
(failure_count, test_count) = doctest.testmod(
pyspark.sql.session,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
- globs["sc"].stop()
+ globs["spark"].stop()
if failure_count:
sys.exit(-1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org