You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/08/23 23:37:42 UTC
[spark] branch master updated: [SPARK-40191][PYTHON][CORE][DOCS] Make pyspark.resource examples self-contained
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ae24203187 [SPARK-40191][PYTHON][CORE][DOCS] Make pyspark.resource examples self-contained
5ae24203187 is described below
commit 5ae242031872e5e8ca8e6353e33666a31ccdf407
Author: Hyukjin Kwon <gu...@gmail.com>
AuthorDate: Tue Aug 23 16:37:13 2022 -0700
[SPARK-40191][PYTHON][CORE][DOCS] Make pyspark.resource examples self-contained
### What changes were proposed in this pull request?
This PR proposes to add a working example `pyspark.resource.ResourceProfile`
In addition, this PR adds return and parameter descriptions with fixing a typo in Scaladoc side.
### Why are the changes needed?
To make the documentation more readable and able to copy and paste directly in PySpark shell.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the documentation
### How was this patch tested?
Manually ran each doctests. CI also runs this.
Closes #37627 from HyukjinKwon/SPARK-40191.
Lead-authored-by: Hyukjin Kwon <gu...@gmail.com>
Co-authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/resource/ExecutorResourceRequests.scala | 2 +-
dev/sparktestsupport/modules.py | 2 +
python/pyspark/resource/information.py | 18 +-
python/pyspark/resource/profile.py | 131 +++++++++++--
python/pyspark/resource/requests.py | 202 ++++++++++++++++++++-
5 files changed, 335 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index b6992f4f883..28ff79ce1f4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -38,7 +38,7 @@ class ExecutorResourceRequests() extends Serializable {
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
/**
- * Returns all the resource requests for the task.
+ * Returns all the resource requests for the executor.
*/
def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index a4531cf5157..2b9d5269379 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -478,6 +478,8 @@ pyspark_resource = Module(
dependencies=[pyspark_core],
source_file_regexes=["python/pyspark/resource"],
python_test_goals=[
+ # doctests
+ "pyspark.resource.profile",
# unittests
"pyspark.resource.tests.test_resources",
],
diff --git a/python/pyspark/resource/information.py b/python/pyspark/resource/information.py
index bcd78ebdc18..92cfc5a6e8b 100644
--- a/python/pyspark/resource/information.py
+++ b/python/pyspark/resource/information.py
@@ -33,11 +33,15 @@ class ResourceInformation:
name : str
the name of the resource
addresses : list
- an array of strings describing the addresses of the resource
+ a list of strings describing the addresses of the resource
Notes
-----
This API is evolving.
+
+ See Also
+ --------
+ :class:`pyspark.resource.ResourceProfile`
"""
def __init__(self, name: str, addresses: List[str]):
@@ -46,8 +50,20 @@ class ResourceInformation:
@property
def name(self) -> str:
+ """
+ Returns
+ -------
+ str
+ the name of the resource
+ """
return self._name
@property
def addresses(self) -> List[str]:
+ """
+ Returns
+ -------
+ list
+ a list of strings describing the addresses of the resource
+ """
return self._addresses
diff --git a/python/pyspark/resource/profile.py b/python/pyspark/resource/profile.py
index 37e8ee85ea2..0b2de444832 100644
--- a/python/pyspark/resource/profile.py
+++ b/python/pyspark/resource/profile.py
@@ -39,6 +39,44 @@ class ResourceProfile:
Notes
-----
This API is evolving.
+
+ Examples
+ --------
+ Create Executor resource requests.
+
+ >>> executor_requests = (
+ ... ExecutorResourceRequests()
+ ... .cores(2)
+ ... .memory("6g")
+ ... .memoryOverhead("1g")
+ ... .pysparkMemory("2g")
+ ... .offheapMemory("3g")
+ ... .resource("gpu", 2, "testGpus", "nvidia.com")
+ ... )
+
+ Create task resource requasts.
+
+ >>> task_requests = TaskResourceRequests().cpus(2).resource("gpu", 2)
+
+ Create a resource profile.
+
+ >>> builder = ResourceProfileBuilder()
+ >>> resource_profile = builder.require(executor_requests).require(task_requests).build
+
+ Create an RDD with the resource profile.
+
+ >>> rdd = sc.parallelize(range(10)).withResources(resource_profile)
+ >>> rdd.getResourceProfile()
+ <pyspark.resource.profile.ResourceProfile object ...>
+ >>> rdd.getResourceProfile().taskResources
+ {'cpus': <...TaskResourceRequest...>, 'gpu': <...TaskResourceRequest...>}
+ >>> rdd.getResourceProfile().executorResources
+ {'gpu': <...ExecutorResourceRequest...>,
+ 'cores': <...ExecutorResourceRequest...>,
+ 'offHeap': <...ExecutorResourceRequest...>,
+ 'memoryOverhead': <...ExecutorResourceRequest...>,
+ 'pyspark.memory': <...ExecutorResourceRequest...>,
+ 'memory': <...ExecutorResourceRequest...>}
"""
@overload
@@ -69,6 +107,13 @@ class ResourceProfile:
@property
def id(self) -> int:
+ """
+ Returns
+ -------
+ int
+ A unique id of this :class:`ResourceProfile`
+ """
+
if self._java_resource_profile is not None:
return self._java_resource_profile.id()
else:
@@ -79,6 +124,13 @@ class ResourceProfile:
@property
def taskResources(self) -> Dict[str, TaskResourceRequest]:
+ """
+ Returns
+ -------
+ dict
+ a dictionary of resources to :class:`TaskResourceRequest`
+ """
+
if self._java_resource_profile is not None:
taskRes = self._java_resource_profile.taskResourcesJMap()
result = {}
@@ -90,6 +142,12 @@ class ResourceProfile:
@property
def executorResources(self) -> Dict[str, ExecutorResourceRequest]:
+ """
+ Returns
+ -------
+ dict
+ a dictionary of resources to :class:`ExecutorResourceRequest`
+ """
if self._java_resource_profile is not None:
execRes = self._java_resource_profile.executorResourcesJMap()
result = {}
@@ -112,6 +170,10 @@ class ResourceProfileBuilder:
.. versionadded:: 3.1.0
+ See Also
+ --------
+ :class:`pyspark.resource.ResourceProfile`
+
Notes
-----
This API is evolving.
@@ -130,12 +192,26 @@ class ResourceProfileBuilder:
else:
self._jvm = None
self._java_resource_profile_builder = None
- self._executor_resource_requests: Optional[Dict[str, ExecutorResourceRequest]] = {}
- self._task_resource_requests: Optional[Dict[str, TaskResourceRequest]] = {}
+ self._executor_resource_requests: Dict[str, ExecutorResourceRequest] = {}
+ self._task_resource_requests: Dict[str, TaskResourceRequest] = {}
def require(
- self, resourceRequest: Union[ExecutorResourceRequest, TaskResourceRequests]
+ self, resourceRequest: Union[ExecutorResourceRequests, TaskResourceRequests]
) -> "ResourceProfileBuilder":
+ """
+ Add executor resource requests
+
+ Parameters
+ ----------
+ resourceRequest : :class:`ExecutorResourceRequests` or :class:`TaskResourceRequests`
+ The detailed executor resource requests, see :class:`ExecutorResourceRequests`
+
+ Returns
+ -------
+ dict
+ a dictionary of resources to :class:`ExecutorResourceRequest`
+ """
+
if isinstance(resourceRequest, TaskResourceRequests):
if self._java_resource_profile_builder is not None:
if resourceRequest._java_task_resource_requests is not None:
@@ -148,25 +224,19 @@ class ResourceProfileBuilder:
taskReqs._java_task_resource_requests
)
else:
- self._task_resource_requests.update( # type: ignore[union-attr]
- resourceRequest.requests
- )
+ self._task_resource_requests.update(resourceRequest.requests)
else:
if self._java_resource_profile_builder is not None:
- r = resourceRequest._java_executor_resource_requests # type: ignore[attr-defined]
+ r = resourceRequest._java_executor_resource_requests
if r is not None:
self._java_resource_profile_builder.require(r)
else:
- execReqs = ExecutorResourceRequests(
- self._jvm, resourceRequest.requests # type: ignore[attr-defined]
- )
+ execReqs = ExecutorResourceRequests(self._jvm, resourceRequest.requests)
self._java_resource_profile_builder.require(
execReqs._java_executor_resource_requests
)
else:
- self._executor_resource_requests.update( # type: ignore[union-attr]
- resourceRequest.requests # type: ignore[attr-defined]
- )
+ self._executor_resource_requests.update(resourceRequest.requests)
return self
def clearExecutorResourceRequests(self) -> None:
@@ -182,7 +252,13 @@ class ResourceProfileBuilder:
self._task_resource_requests = {}
@property
- def taskResources(self) -> Optional[Dict[str, TaskResourceRequest]]:
+ def taskResources(self) -> Dict[str, TaskResourceRequest]:
+ """
+ Returns
+ -------
+ dict
+ a dictionary of resources to :class:`TaskResourceRequest`
+ """
if self._java_resource_profile_builder is not None:
taskRes = self._java_resource_profile_builder.taskResourcesJMap()
result = {}
@@ -193,7 +269,13 @@ class ResourceProfileBuilder:
return self._task_resource_requests
@property
- def executorResources(self) -> Optional[Dict[str, ExecutorResourceRequest]]:
+ def executorResources(self) -> Dict[str, ExecutorResourceRequest]:
+ """
+ Returns
+ -------
+ dict
+ a dictionary of resources to :class:`ExecutorResourceRequest`
+ """
if self._java_resource_profile_builder is not None:
result = {}
execRes = self._java_resource_profile_builder.executorResourcesJMap()
@@ -214,3 +296,22 @@ class ResourceProfileBuilder:
return ResourceProfile(
_exec_req=self._executor_resource_requests, _task_req=self._task_resource_requests
)
+
+
+def _test() -> None:
+ import doctest
+ import sys
+ from pyspark import SparkContext
+
+ globs = globals().copy()
+ globs["sc"] = SparkContext("local[4]", "profile tests")
+ (failure_count, test_count) = doctest.testmod(
+ globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE
+ )
+ globs["sc"].stop()
+ if failure_count:
+ sys.exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py
index 0999e4e4aeb..043124e6959 100644
--- a/python/pyspark/resource/requests.py
+++ b/python/pyspark/resource/requests.py
@@ -42,7 +42,7 @@ class ExecutorResourceRequest:
See the configuration and cluster specific docs for more details.
- Use :py:class:`pyspark.ExecutorResourceRequests` class as a convenience API.
+ Use :class:`pyspark.ExecutorResourceRequests` class as a convenience API.
.. versionadded:: 3.1.0
@@ -60,6 +60,10 @@ class ExecutorResourceRequest:
vendor : str, optional
Vendor, required for some cluster managers
+ See Also
+ --------
+ :class:`pyspark.resource.ResourceProfile`
+
Notes
-----
This API is evolving.
@@ -79,18 +83,42 @@ class ExecutorResourceRequest:
@property
def resourceName(self) -> str:
+ """
+ Returns
+ -------
+ str
+ Name of the resource
+ """
return self._name
@property
def amount(self) -> int:
+ """
+ Returns
+ -------
+ str
+ Amount requesting
+ """
return self._amount
@property
def discoveryScript(self) -> str:
+ """
+ Returns
+ -------
+ str
+ Amount requesting
+ """
return self._discovery_script
@property
def vendor(self) -> str:
+ """
+ Returns
+ -------
+ str
+ Vendor, required for some cluster managers
+ """
return self._vendor
@@ -103,6 +131,10 @@ class ExecutorResourceRequests:
.. versionadded:: 3.1.0
+ See Also
+ --------
+ :class:`pyspark.resource.ResourceProfile`
+
Notes
-----
This API is evolving.
@@ -157,6 +189,20 @@ class ExecutorResourceRequests:
self._executor_resources: Dict[str, ExecutorResourceRequest] = {}
def memory(self, amount: str) -> "ExecutorResourceRequests":
+ """
+ Specify heap memory. The value specified will be converted to MiB.
+ This is a convenient API to add :class:`ExecutorResourceRequest` for "memory" resource.
+
+ Parameters
+ ----------
+ amount : str
+ Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ Default unit is MiB if not specified.
+
+ Returns
+ -------
+ :class:`ExecutorResourceRequests`
+ """
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.memory(amount)
else:
@@ -166,6 +212,21 @@ class ExecutorResourceRequests:
return self
def memoryOverhead(self, amount: str) -> "ExecutorResourceRequests":
+ """
+ Specify overhead memory. The value specified will be converted to MiB.
+ This is a convenient API to add :class:`ExecutorResourceRequest` for "memoryOverhead"
+ resource.
+
+ Parameters
+ ----------
+ amount : str
+ Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ Default unit is MiB if not specified.
+
+ Returns
+ -------
+ :class:`ExecutorResourceRequests`
+ """
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.memoryOverhead(amount)
else:
@@ -175,6 +236,21 @@ class ExecutorResourceRequests:
return self
def pysparkMemory(self, amount: str) -> "ExecutorResourceRequests":
+ """
+ Specify pyspark memory. The value specified will be converted to MiB.
+ This is a convenient API to add :class:`ExecutorResourceRequest` for "pyspark.memory"
+ resource.
+
+ Parameters
+ ----------
+ amount : str
+ Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ Default unit is MiB if not specified.
+
+ Returns
+ -------
+ :class:`ExecutorResourceRequests`
+ """
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.pysparkMemory(amount)
else:
@@ -184,6 +260,22 @@ class ExecutorResourceRequests:
return self
def offheapMemory(self, amount: str) -> "ExecutorResourceRequests":
+ """
+ Specify off heap memory. The value specified will be converted to MiB.
+ This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
+ This is a convenient API to add :class:`ExecutorResourceRequest` for "offHeap"
+ resource.
+
+ Parameters
+ ----------
+ amount : str
+ Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ Default unit is MiB if not specified.
+
+ Returns
+ -------
+ :class:`ExecutorResourceRequests`
+ """
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.offHeapMemory(amount)
else:
@@ -193,6 +285,19 @@ class ExecutorResourceRequests:
return self
def cores(self, amount: int) -> "ExecutorResourceRequests":
+ """
+ Specify number of cores per Executor.
+ This is a convenient API to add :class:`ExecutorResourceRequest` for "cores" resource.
+
+ Parameters
+ ----------
+ amount : int
+ Number of cores to allocate per Executor.
+
+ Returns
+ -------
+ :class:`ExecutorResourceRequests`
+ """
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.cores(amount)
else:
@@ -206,6 +311,32 @@ class ExecutorResourceRequests:
discoveryScript: str = "",
vendor: str = "",
) -> "ExecutorResourceRequests":
+ """
+ Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
+ correspond to the regular Spark configs with the prefix removed. For instance, resources
+ like GPUs are gpu (spark configs `spark.executor.resource.gpu.*`). If you pass in a resource
+ that the cluster manager doesn't support the result is undefined, it may error or may just
+ be ignored.
+ This is a convenient API to add :class:`ExecutorResourceRequest` for custom resources.
+
+ Parameters
+ ----------
+ resourceName : str
+ Name of the resource.
+ amount : str
+ amount of that resource per executor to use.
+ discoveryScript : str, optional
+ Optional script used to discover the resources. This is required on
+ some cluster managers that don't tell Spark the addresses of
+ the resources allocated. The script runs on Executors startup to
+ of the resources available.
+ vendor : str
+ Optional vendor, required for some cluster managers
+
+ Returns
+ -------
+ :class:`ExecutorResourceRequests`
+ """
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.resource(
resourceName, amount, discoveryScript, vendor
@@ -218,6 +349,12 @@ class ExecutorResourceRequests:
@property
def requests(self) -> Dict[str, ExecutorResourceRequest]:
+ """
+ Returns
+ -------
+ dict
+ Returns all the resource requests for the executor.
+ """
if self._java_executor_resource_requests is not None:
result = {}
execRes = self._java_executor_resource_requests.requestsJMap()
@@ -235,7 +372,7 @@ class TaskResourceRequest:
A task resource request. This is used in conjunction with the
:class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
needed for an RDD that will be applied at the stage level. The amount is specified
- as a Double to allow for saying you want more than 1 task per resource. Valid values
+ as a float to allow for saying you want more than 1 task per resource. Valid values
are less than or equal to 0.5 or whole numbers.
Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
@@ -245,10 +382,16 @@ class TaskResourceRequest:
Name of the resource
amount : float
Amount requesting as a float to support fractional resource requests.
- Valid values are less than or equal to 0.5 or whole numbers.
+ Valid values are less than or equal to 0.5 or whole numbers. This essentially
+ lets you configure X number of tasks to run on a single resource,
+ ie amount equals 0.5 translates into 2 tasks per resource address.
.. versionadded:: 3.1.0
+ See Also
+ --------
+ :class:`pyspark.resource.ResourceProfile`
+
Notes
-----
This API is evolving.
@@ -260,10 +403,22 @@ class TaskResourceRequest:
@property
def resourceName(self) -> str:
+ """
+ Returns
+ -------
+ str
+ Name of the resource.
+ """
return self._name
@property
def amount(self) -> float:
+ """
+ Returns
+ -------
+ str
+ Amount requesting as a float to support fractional resource requests.
+ """
return self._amount
@@ -276,6 +431,10 @@ class TaskResourceRequests:
.. versionadded:: 3.1.0
+ See Also
+ --------
+ :class:`pyspark.resource.ResourceProfile`
+
Notes
-----
This API is evolving.
@@ -318,6 +477,19 @@ class TaskResourceRequests:
self._task_resources: Dict[str, TaskResourceRequest] = {}
def cpus(self, amount: int) -> "TaskResourceRequests":
+ """
+ Specify number of cpus per Task.
+ This is a convenient API to add :class:`TaskResourceRequest` for cpus.
+
+ Parameters
+ ----------
+ amount : int
+ Number of cpus to allocate per Task.
+
+ Returns
+ -------
+ :class:`TaskResourceRequests`
+ """
if self._java_task_resource_requests is not None:
self._java_task_resource_requests.cpus(amount)
else:
@@ -325,6 +497,24 @@ class TaskResourceRequests:
return self
def resource(self, resourceName: str, amount: float) -> "TaskResourceRequests":
+ """
+ Amount of a particular custom resource(GPU, FPGA, etc) to use.
+ This is a convenient API to add :class:`TaskResourceRequest` for custom resources.
+
+ Parameters
+ ----------
+ resourceName : str
+ Name of the resource.
+ amount : float
+ Amount requesting as a float to support fractional resource requests.
+ Valid values are less than or equal to 0.5 or whole numbers. This essentially
+ lets you configure X number of tasks to run on a single resource,
+ ie amount equals 0.5 translates into 2 tasks per resource address.
+
+ Returns
+ -------
+ :class:`TaskResourceRequests`
+ """
if self._java_task_resource_requests is not None:
self._java_task_resource_requests.resource(resourceName, float(amount))
else:
@@ -333,6 +523,12 @@ class TaskResourceRequests:
@property
def requests(self) -> Dict[str, TaskResourceRequest]:
+ """
+ Returns
+ -------
+ dict
+ Returns all the resource requests for the task.
+ """
if self._java_task_resource_requests is not None:
result = {}
taskRes = self._java_task_resource_requests.requestsJMap()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org