You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/23 06:37:49 UTC
[spark] branch master updated: [SPARK-44135][PYTHON][CONNECT][DOCS] Document Spark Connect only API in PySpark
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8eda2d8472c [SPARK-44135][PYTHON][CONNECT][DOCS] Document Spark Connect only API in PySpark
8eda2d8472c is described below
commit 8eda2d8472c4ab054952879ee9408ec4dc6f0191
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Jun 23 15:37:38 2023 +0900
[SPARK-44135][PYTHON][CONNECT][DOCS] Document Spark Connect only API in PySpark
### What changes were proposed in this pull request?
This PR proposes to document:
- `SparkSession.builder.create` (SPARK-43509)
- `SparkSession.addArtifact` (SPARK-43612)
- `SparkSession.addArtifacts` (SPARK-43612)
- `SparkSession.copyFromLocalToFs` (SPARK-43790)
- `SparkSession.client` (SPARK-41255)
It adds the methods to `SparkSession` so the type hints work correctly but it throws a runtime error if it's not Spark Connect.
### Why are the changes needed?
In order to document Spark Connect client features.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to see the new API in the documentation
### How was this patch tested?
I manually built the Python API reference documentation, and checked as below:
![Screenshot 2023-06-23 at 1 22 00 PM](https://github.com/apache/spark/assets/6477701/5900c95c-60bc-43b1-96fe-9662065221b6)
![Screenshot 2023-06-23 at 1 23 04 PM](https://github.com/apache/spark/assets/6477701/14e41af7-c33d-49a5-900f-0614c7dff609)
![Screenshot 2023-06-23 at 1 23 08 PM](https://github.com/apache/spark/assets/6477701/98ee97b2-3bb6-42e8-acb1-e67bea95c748)
![Screenshot 2023-06-23 at 1 23 11 PM](https://github.com/apache/spark/assets/6477701/410ce18f-cce6-405a-8594-bc29b50c4aae)
![Screenshot 2023-06-23 at 1 23 14 PM](https://github.com/apache/spark/assets/6477701/6e25e9fb-96a2-4a11-93de-aacd9d792106)
![Screenshot 2023-06-23 at 1 23 17 PM](https://github.com/apache/spark/assets/6477701/1151fae6-fef0-4d51-875a-4ae94f0230d7)
Closes #41708 from HyukjinKwon/SPARK-44135.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../source/reference/pyspark.sql/spark_session.rst | 16 +-
python/pyspark/ml/connect/io_utils.py | 2 +-
python/pyspark/sql/connect/session.py | 56 +------
python/pyspark/sql/session.py | 161 ++++++++++++++++-----
4 files changed, 146 insertions(+), 89 deletions(-)
diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst b/python/docs/source/reference/pyspark.sql/spark_session.rst
index edd5e746161..9867a9cd121 100644
--- a/python/docs/source/reference/pyspark.sql/spark_session.rst
+++ b/python/docs/source/reference/pyspark.sql/spark_session.rst
@@ -21,9 +21,6 @@ Spark Session
=============
.. currentmodule:: pyspark.sql
-.. autosummary::
- :toctree: api/
-
The entry point to programming Spark with the Dataset and DataFrame API.
To create a Spark session, you should use ``SparkSession.builder`` attribute.
See also :class:`SparkSession`.
@@ -53,3 +50,16 @@ See also :class:`SparkSession`.
SparkSession.udf
SparkSession.udtf
SparkSession.version
+
+
+Spark Connect Only
+------------------
+
+.. autosummary::
+ :toctree: api/
+
+ SparkSession.builder.create
+ SparkSession.addArtifact
+ SparkSession.addArtifacts
+ SparkSession.copyFromLocalToFs
+ SparkSession.client
diff --git a/python/pyspark/ml/connect/io_utils.py b/python/pyspark/ml/connect/io_utils.py
index c701736712f..7c3025849da 100644
--- a/python/pyspark/ml/connect/io_utils.py
+++ b/python/pyspark/ml/connect/io_utils.py
@@ -36,7 +36,7 @@ _META_DATA_FILE_NAME = "metadata.json"
def _copy_file_from_local_to_fs(local_path: str, dest_path: str) -> None:
session = _get_active_session(is_remote())
if is_remote():
- session.copyFromLocalToFs(local_path, dest_path) # type: ignore[attr-defined]
+ session.copyFromLocalToFs(local_path, dest_path)
else:
jvm = session.sparkContext._gateway.jvm # type: ignore[union-attr]
jvm.org.apache.spark.ml.python.MLUtil.copyFileFromLocalToFs(local_path, dest_path)
diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py
index ba0d8caaeca..365829ff7bc 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -612,48 +612,21 @@ class SparkSession:
assert result is not None
return result
- # SparkConnect-specific API
@property
def client(self) -> "SparkConnectClient":
- """
- Gives access to the Spark Connect client. In normal cases this is not necessary to be used
- and only relevant for testing.
-
- .. versionadded:: 3.4.0
-
- Returns
- -------
- :class:`SparkConnectClient`
- """
return self._client
+ client.__doc__ = PySparkSession.client.__doc__
+
def addArtifacts(
self, *path: str, pyfile: bool = False, archive: bool = False, file: bool = False
) -> None:
- """
- Add artifact(s) to the client session. Currently only local files are supported.
-
- .. versionadded:: 3.5.0
-
- Parameters
- ----------
- *path : tuple of str
- Artifact's URIs to add.
- pyfile : bool
- Whether to add them as Python dependencies such as .py, .egg, .zip or .jar files.
- The pyfiles are directly inserted into the path when executing Python functions
- in executors.
- archive : bool
- Whether to add them as archives such as .zip, .jar, .tar.gz, .tgz, or .tar files.
- The archives are unpacked on the executor side automatically.
- file : bool
- Add a file to be downloaded with this Spark job on every node.
- The ``path`` passed can only be a local file for now.
- """
if sum([file, pyfile, archive]) > 1:
raise ValueError("'pyfile', 'archive' and/or 'file' cannot be True together.")
self._client.add_artifacts(*path, pyfile=pyfile, archive=archive, file=file)
+ addArtifacts.__doc__ = PySparkSession.addArtifacts.__doc__
+
addArtifact = addArtifacts
def _cache_local_relation(self, local_relation: LocalRelation) -> str:
@@ -664,25 +637,6 @@ class SparkSession:
return self._client.cache_artifact(serialized)
def copyFromLocalToFs(self, local_path: str, dest_path: str) -> None:
- """
- Copy file from local to cloud storage file system.
- If the file already exits in destination path, old file is overwritten.
-
- Parameters
- ----------
- local_path: str
- Path to a local file. Directories are not supported.
- The path can be either an absolute path or a relative path.
-
- dest_path: str
- The cloud storage path to the destination the file will
- be copied to.
- The path must be an an absolute path.
-
- Notes
- -----
- This API is a developer API.
- """
if urllib.parse.urlparse(dest_path).scheme:
raise ValueError(
"`spark_session.copyFromLocalToFs` API only allows `dest_path` to be a path "
@@ -691,6 +645,8 @@ class SparkSession:
)
self._client.copy_from_local_to_fs(local_path, dest_path)
+ copyFromLocalToFs.__doc__ = PySparkSession.copyFromLocalToFs.__doc__
+
@staticmethod
def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
"""
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 47b73700f0c..00a0047dfd1 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -74,6 +74,10 @@ if TYPE_CHECKING:
from pyspark.sql.udf import UDFRegistration
from pyspark.sql.udtf import UDTFRegistration
+ # Running MyPy type checks will always require pandas and
+ # other dependencies so importing here is fine.
+ from pyspark.sql.connect.client import SparkConnectClient
+
__all__ = ["SparkSession"]
@@ -405,41 +409,6 @@ class SparkSession(SparkConversionMixin):
"""
return self.config("spark.sql.catalogImplementation", "hive")
- def create(self) -> "SparkSession":
- """Creates a new SparkSession. Can only be used in the context of Spark Connect
- and will throw an exception otherwise.
-
- .. versionadded:: 3.5.0
-
- Returns
- -------
- :class:`SparkSession`
- """
- opts = dict(self._options)
- if "SPARK_REMOTE" in os.environ or "spark.remote" in opts:
- from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
-
- # Validate that no incompatible configuration options are selected.
- self._validate_startup_urls()
-
- url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))
- if url.startswith("local"):
- raise RuntimeError(
- "Creating new SparkSessions with `local` "
- "connection string is not supported."
- )
-
- # Mark this Spark Session as Spark Connect. This prevents that local PySpark is
- # used in conjunction with Spark Connect mode.
- os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
- opts["spark.remote"] = url
- return RemoteSparkSession.builder.config(map=opts).create()
- else:
- raise RuntimeError(
- "SparkSession.builder.create() can only be used with Spark Connect; "
- "however, spark.remote was not found."
- )
-
def getOrCreate(self) -> "SparkSession":
"""Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
new one based on the options set in this builder.
@@ -530,6 +499,42 @@ class SparkSession(SparkConversionMixin):
).applyModifiableSettings(session._jsparkSession, self._options)
return session
+ # SparkConnect-specific API
+ def create(self) -> "SparkSession":
+ """Creates a new SparkSession. Can only be used in the context of Spark Connect
+ and will throw an exception otherwise.
+
+ .. versionadded:: 3.5.0
+
+ Returns
+ -------
+ :class:`SparkSession`
+ """
+ opts = dict(self._options)
+ if "SPARK_REMOTE" in os.environ or "spark.remote" in opts:
+ from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
+
+ # Validate that no incompatible configuration options are selected.
+ self._validate_startup_urls()
+
+ url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))
+ if url.startswith("local"):
+ raise RuntimeError(
+ "Creating new SparkSessions with `local` "
+ "connection string is not supported."
+ )
+
+ # Mark this Spark Session as Spark Connect. This prevents that local PySpark is
+ # used in conjunction with Spark Connect mode.
+ os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
+ opts["spark.remote"] = url
+ return RemoteSparkSession.builder.config(map=opts).create()
+ else:
+ raise RuntimeError(
+ "SparkSession.builder.create() can only be used with Spark Connect; "
+ "however, spark.remote was not found."
+ )
+
# TODO(SPARK-38912): Replace @classproperty with @classmethod + @property once support for
# Python 3.8 is dropped.
#
@@ -1767,6 +1772,92 @@ class SparkSession(SparkConversionMixin):
"""
self.stop()
+ # SparkConnect-specific API
+ @property
+ def client(self) -> "SparkConnectClient":
+ """
+ Gives access to the Spark Connect client. In normal cases this is not necessary to be used
+ and only relevant for testing.
+
+ .. versionadded:: 3.4.0
+
+ Returns
+ -------
+ :class:`SparkConnectClient`
+
+ Notes
+ -----
+ This is an API dedicated to Spark Connect client only. With regular Spark Session, it throws
+ an exception.
+ """
+ raise RuntimeError(
+ "SparkSession.client is only supported with Spark Connect; "
+ "however, the current Spark session does not use Spark Connect."
+ )
+
+ def addArtifacts(
+ self, *path: str, pyfile: bool = False, archive: bool = False, file: bool = False
+ ) -> None:
+ """
+ Add artifact(s) to the client session. Currently only local files are supported.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ *path : tuple of str
+ Artifact's URIs to add.
+ pyfile : bool
+ Whether to add them as Python dependencies such as .py, .egg, .zip or .jar files.
+ The pyfiles are directly inserted into the path when executing Python functions
+ in executors.
+ archive : bool
+ Whether to add them as archives such as .zip, .jar, .tar.gz, .tgz, or .tar files.
+ The archives are unpacked on the executor side automatically.
+ file : bool
+ Add a file to be downloaded with this Spark job on every node.
+ The ``path`` passed can only be a local file for now.
+
+ Notes
+ -----
+ This is an API dedicated to Spark Connect client only. With regular Spark Session, it throws
+ an exception.
+ """
+ raise RuntimeError(
+ "SparkSession.addArtifact(s) is only supported with Spark Connect; "
+ "however, the current Spark session does not use Spark Connect."
+ )
+
+ addArtifact = addArtifacts
+
+ def copyFromLocalToFs(self, local_path: str, dest_path: str) -> None:
+ """
+ Copy file from local to cloud storage file system.
+ If the file already exits in destination path, old file is overwritten.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ local_path: str
+ Path to a local file. Directories are not supported.
+ The path can be either an absolute path or a relative path.
+ dest_path: str
+ The cloud storage path to the destination the file will
+ be copied to.
+ The path must be an an absolute path.
+
+ Notes
+ -----
+ This API is a developer API.
+ Also, this is an API dedicated to Spark Connect client only. With regular
+ Spark Session, it throws an exception.
+ """
+ raise RuntimeError(
+ "SparkSession.copyFromLocalToFs is only supported with Spark Connect; "
+ "however, the current Spark session does not use Spark Connect."
+ )
+
def _test() -> None:
import os
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org