You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/08/23 23:37:42 UTC

[spark] branch master updated: [SPARK-40191][PYTHON][CORE][DOCS] Make pyspark.resource examples self-contained

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ae24203187 [SPARK-40191][PYTHON][CORE][DOCS] Make pyspark.resource examples self-contained
5ae24203187 is described below

commit 5ae242031872e5e8ca8e6353e33666a31ccdf407
Author: Hyukjin Kwon <gu...@gmail.com>
AuthorDate: Tue Aug 23 16:37:13 2022 -0700

    [SPARK-40191][PYTHON][CORE][DOCS] Make pyspark.resource examples self-contained
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add a working example `pyspark.resource.ResourceProfile`
    
    In addition, this PR adds return and parameter descriptions with fixing a typo in Scaladoc side.
    
    ### Why are the changes needed?
    
    To make the documentation more readable and able to copy and paste directly in PySpark shell.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it changes the documentation
    
    ### How was this patch tested?
    
    Manually ran each doctests. CI also runs this.
    
    Closes #37627 from HyukjinKwon/SPARK-40191.
    
    Lead-authored-by: Hyukjin Kwon <gu...@gmail.com>
    Co-authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/resource/ExecutorResourceRequests.scala  |   2 +-
 dev/sparktestsupport/modules.py                    |   2 +
 python/pyspark/resource/information.py             |  18 +-
 python/pyspark/resource/profile.py                 | 131 +++++++++++--
 python/pyspark/resource/requests.py                | 202 ++++++++++++++++++++-
 5 files changed, 335 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index b6992f4f883..28ff79ce1f4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -38,7 +38,7 @@ class ExecutorResourceRequests() extends Serializable {
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
   /**
-   * Returns all the resource requests for the task.
+   * Returns all the resource requests for the executor.
    */
   def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
 
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index a4531cf5157..2b9d5269379 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -478,6 +478,8 @@ pyspark_resource = Module(
     dependencies=[pyspark_core],
     source_file_regexes=["python/pyspark/resource"],
     python_test_goals=[
+        # doctests
+        "pyspark.resource.profile",
         # unittests
         "pyspark.resource.tests.test_resources",
     ],
diff --git a/python/pyspark/resource/information.py b/python/pyspark/resource/information.py
index bcd78ebdc18..92cfc5a6e8b 100644
--- a/python/pyspark/resource/information.py
+++ b/python/pyspark/resource/information.py
@@ -33,11 +33,15 @@ class ResourceInformation:
     name : str
         the name of the resource
     addresses : list
-        an array of strings describing the addresses of the resource
+        a list of strings describing the addresses of the resource
 
     Notes
     -----
     This API is evolving.
+
+    See Also
+    --------
+    :class:`pyspark.resource.ResourceProfile`
     """
 
     def __init__(self, name: str, addresses: List[str]):
@@ -46,8 +50,20 @@ class ResourceInformation:
 
     @property
     def name(self) -> str:
+        """
+        Returns
+        -------
+        str
+            the name of the resource
+        """
         return self._name
 
     @property
     def addresses(self) -> List[str]:
+        """
+        Returns
+        -------
+        list
+            a list of strings describing the addresses of the resource
+        """
         return self._addresses
diff --git a/python/pyspark/resource/profile.py b/python/pyspark/resource/profile.py
index 37e8ee85ea2..0b2de444832 100644
--- a/python/pyspark/resource/profile.py
+++ b/python/pyspark/resource/profile.py
@@ -39,6 +39,44 @@ class ResourceProfile:
     Notes
     -----
     This API is evolving.
+
+    Examples
+    --------
+    Create Executor resource requests.
+
+    >>> executor_requests = (
+    ...     ExecutorResourceRequests()
+    ...     .cores(2)
+    ...     .memory("6g")
+    ...     .memoryOverhead("1g")
+    ...     .pysparkMemory("2g")
+    ...     .offheapMemory("3g")
+    ...     .resource("gpu", 2, "testGpus", "nvidia.com")
+    ... )
+
+    Create task resource requasts.
+
+    >>> task_requests = TaskResourceRequests().cpus(2).resource("gpu", 2)
+
+    Create a resource profile.
+
+    >>> builder = ResourceProfileBuilder()
+    >>> resource_profile = builder.require(executor_requests).require(task_requests).build
+
+    Create an RDD with the resource profile.
+
+    >>> rdd = sc.parallelize(range(10)).withResources(resource_profile)
+    >>> rdd.getResourceProfile()
+    <pyspark.resource.profile.ResourceProfile object ...>
+    >>> rdd.getResourceProfile().taskResources
+    {'cpus': <...TaskResourceRequest...>, 'gpu': <...TaskResourceRequest...>}
+    >>> rdd.getResourceProfile().executorResources
+    {'gpu': <...ExecutorResourceRequest...>,
+     'cores': <...ExecutorResourceRequest...>,
+     'offHeap': <...ExecutorResourceRequest...>,
+     'memoryOverhead': <...ExecutorResourceRequest...>,
+     'pyspark.memory': <...ExecutorResourceRequest...>,
+     'memory': <...ExecutorResourceRequest...>}
     """
 
     @overload
@@ -69,6 +107,13 @@ class ResourceProfile:
 
     @property
     def id(self) -> int:
+        """
+        Returns
+        -------
+        int
+            A unique id of this :class:`ResourceProfile`
+        """
+
         if self._java_resource_profile is not None:
             return self._java_resource_profile.id()
         else:
@@ -79,6 +124,13 @@ class ResourceProfile:
 
     @property
     def taskResources(self) -> Dict[str, TaskResourceRequest]:
+        """
+        Returns
+        -------
+        dict
+            a dictionary of resources to :class:`TaskResourceRequest`
+        """
+
         if self._java_resource_profile is not None:
             taskRes = self._java_resource_profile.taskResourcesJMap()
             result = {}
@@ -90,6 +142,12 @@ class ResourceProfile:
 
     @property
     def executorResources(self) -> Dict[str, ExecutorResourceRequest]:
+        """
+        Returns
+        -------
+        dict
+            a dictionary of resources to :class:`ExecutorResourceRequest`
+        """
         if self._java_resource_profile is not None:
             execRes = self._java_resource_profile.executorResourcesJMap()
             result = {}
@@ -112,6 +170,10 @@ class ResourceProfileBuilder:
 
     .. versionadded:: 3.1.0
 
+    See Also
+    --------
+    :class:`pyspark.resource.ResourceProfile`
+
     Notes
     -----
     This API is evolving.
@@ -130,12 +192,26 @@ class ResourceProfileBuilder:
         else:
             self._jvm = None
             self._java_resource_profile_builder = None
-            self._executor_resource_requests: Optional[Dict[str, ExecutorResourceRequest]] = {}
-            self._task_resource_requests: Optional[Dict[str, TaskResourceRequest]] = {}
+            self._executor_resource_requests: Dict[str, ExecutorResourceRequest] = {}
+            self._task_resource_requests: Dict[str, TaskResourceRequest] = {}
 
     def require(
-        self, resourceRequest: Union[ExecutorResourceRequest, TaskResourceRequests]
+        self, resourceRequest: Union[ExecutorResourceRequests, TaskResourceRequests]
     ) -> "ResourceProfileBuilder":
+        """
+        Add executor resource requests
+
+        Parameters
+        ----------
+        resourceRequest : :class:`ExecutorResourceRequests` or :class:`TaskResourceRequests`
+            The detailed executor resource requests, see :class:`ExecutorResourceRequests`
+
+        Returns
+        -------
+        dict
+            a dictionary of resources to :class:`ExecutorResourceRequest`
+        """
+
         if isinstance(resourceRequest, TaskResourceRequests):
             if self._java_resource_profile_builder is not None:
                 if resourceRequest._java_task_resource_requests is not None:
@@ -148,25 +224,19 @@ class ResourceProfileBuilder:
                         taskReqs._java_task_resource_requests
                     )
             else:
-                self._task_resource_requests.update(  # type: ignore[union-attr]
-                    resourceRequest.requests
-                )
+                self._task_resource_requests.update(resourceRequest.requests)
         else:
             if self._java_resource_profile_builder is not None:
-                r = resourceRequest._java_executor_resource_requests  # type: ignore[attr-defined]
+                r = resourceRequest._java_executor_resource_requests
                 if r is not None:
                     self._java_resource_profile_builder.require(r)
                 else:
-                    execReqs = ExecutorResourceRequests(
-                        self._jvm, resourceRequest.requests  # type: ignore[attr-defined]
-                    )
+                    execReqs = ExecutorResourceRequests(self._jvm, resourceRequest.requests)
                     self._java_resource_profile_builder.require(
                         execReqs._java_executor_resource_requests
                     )
             else:
-                self._executor_resource_requests.update(  # type: ignore[union-attr]
-                    resourceRequest.requests  # type: ignore[attr-defined]
-                )
+                self._executor_resource_requests.update(resourceRequest.requests)
         return self
 
     def clearExecutorResourceRequests(self) -> None:
@@ -182,7 +252,13 @@ class ResourceProfileBuilder:
             self._task_resource_requests = {}
 
     @property
-    def taskResources(self) -> Optional[Dict[str, TaskResourceRequest]]:
+    def taskResources(self) -> Dict[str, TaskResourceRequest]:
+        """
+        Returns
+        -------
+        dict
+            a dictionary of resources to :class:`TaskResourceRequest`
+        """
         if self._java_resource_profile_builder is not None:
             taskRes = self._java_resource_profile_builder.taskResourcesJMap()
             result = {}
@@ -193,7 +269,13 @@ class ResourceProfileBuilder:
             return self._task_resource_requests
 
     @property
-    def executorResources(self) -> Optional[Dict[str, ExecutorResourceRequest]]:
+    def executorResources(self) -> Dict[str, ExecutorResourceRequest]:
+        """
+        Returns
+        -------
+        dict
+            a dictionary of resources to :class:`ExecutorResourceRequest`
+        """
         if self._java_resource_profile_builder is not None:
             result = {}
             execRes = self._java_resource_profile_builder.executorResourcesJMap()
@@ -214,3 +296,22 @@ class ResourceProfileBuilder:
             return ResourceProfile(
                 _exec_req=self._executor_resource_requests, _task_req=self._task_resource_requests
             )
+
+
+def _test() -> None:
+    import doctest
+    import sys
+    from pyspark import SparkContext
+
+    globs = globals().copy()
+    globs["sc"] = SparkContext("local[4]", "profile tests")
+    (failure_count, test_count) = doctest.testmod(
+        globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE
+    )
+    globs["sc"].stop()
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()
diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py
index 0999e4e4aeb..043124e6959 100644
--- a/python/pyspark/resource/requests.py
+++ b/python/pyspark/resource/requests.py
@@ -42,7 +42,7 @@ class ExecutorResourceRequest:
 
     See the configuration and cluster specific docs for more details.
 
-    Use :py:class:`pyspark.ExecutorResourceRequests` class as a convenience API.
+    Use :class:`pyspark.ExecutorResourceRequests` class as a convenience API.
 
     .. versionadded:: 3.1.0
 
@@ -60,6 +60,10 @@ class ExecutorResourceRequest:
     vendor : str, optional
         Vendor, required for some cluster managers
 
+    See Also
+    --------
+    :class:`pyspark.resource.ResourceProfile`
+
     Notes
     -----
     This API is evolving.
@@ -79,18 +83,42 @@ class ExecutorResourceRequest:
 
     @property
     def resourceName(self) -> str:
+        """
+        Returns
+        -------
+        str
+            Name of the resource
+        """
         return self._name
 
     @property
     def amount(self) -> int:
+        """
+        Returns
+        -------
+        str
+            Amount requesting
+        """
         return self._amount
 
     @property
     def discoveryScript(self) -> str:
+        """
+        Returns
+        -------
+        str
+            Amount requesting
+        """
         return self._discovery_script
 
     @property
     def vendor(self) -> str:
+        """
+        Returns
+        -------
+        str
+            Vendor, required for some cluster managers
+        """
         return self._vendor
 
 
@@ -103,6 +131,10 @@ class ExecutorResourceRequests:
 
     .. versionadded:: 3.1.0
 
+    See Also
+    --------
+    :class:`pyspark.resource.ResourceProfile`
+
     Notes
     -----
     This API is evolving.
@@ -157,6 +189,20 @@ class ExecutorResourceRequests:
             self._executor_resources: Dict[str, ExecutorResourceRequest] = {}
 
     def memory(self, amount: str) -> "ExecutorResourceRequests":
+        """
+        Specify heap memory. The value specified will be converted to MiB.
+        This is a convenient API to add :class:`ExecutorResourceRequest` for "memory" resource.
+
+        Parameters
+        ----------
+        amount : str
+            Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+            Default unit is MiB if not specified.
+
+        Returns
+        -------
+        :class:`ExecutorResourceRequests`
+        """
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.memory(amount)
         else:
@@ -166,6 +212,21 @@ class ExecutorResourceRequests:
         return self
 
     def memoryOverhead(self, amount: str) -> "ExecutorResourceRequests":
+        """
+        Specify overhead memory. The value specified will be converted to MiB.
+        This is a convenient API to add :class:`ExecutorResourceRequest` for "memoryOverhead"
+        resource.
+
+        Parameters
+        ----------
+        amount : str
+            Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+            Default unit is MiB if not specified.
+
+        Returns
+        -------
+        :class:`ExecutorResourceRequests`
+        """
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.memoryOverhead(amount)
         else:
@@ -175,6 +236,21 @@ class ExecutorResourceRequests:
         return self
 
     def pysparkMemory(self, amount: str) -> "ExecutorResourceRequests":
+        """
+        Specify pyspark memory. The value specified will be converted to MiB.
+        This is a convenient API to add :class:`ExecutorResourceRequest` for "pyspark.memory"
+        resource.
+
+        Parameters
+        ----------
+        amount : str
+            Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+            Default unit is MiB if not specified.
+
+        Returns
+        -------
+        :class:`ExecutorResourceRequests`
+        """
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.pysparkMemory(amount)
         else:
@@ -184,6 +260,22 @@ class ExecutorResourceRequests:
         return self
 
     def offheapMemory(self, amount: str) -> "ExecutorResourceRequests":
+        """
+        Specify off heap memory. The value specified will be converted to MiB.
+        This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
+        This is a convenient API to add :class:`ExecutorResourceRequest` for "offHeap"
+        resource.
+
+        Parameters
+        ----------
+        amount : str
+            Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+            Default unit is MiB if not specified.
+
+        Returns
+        -------
+        :class:`ExecutorResourceRequests`
+        """
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.offHeapMemory(amount)
         else:
@@ -193,6 +285,19 @@ class ExecutorResourceRequests:
         return self
 
     def cores(self, amount: int) -> "ExecutorResourceRequests":
+        """
+        Specify number of cores per Executor.
+        This is a convenient API to add :class:`ExecutorResourceRequest` for "cores" resource.
+
+        Parameters
+        ----------
+        amount : int
+            Number of cores to allocate per Executor.
+
+        Returns
+        -------
+        :class:`ExecutorResourceRequests`
+        """
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.cores(amount)
         else:
@@ -206,6 +311,32 @@ class ExecutorResourceRequests:
         discoveryScript: str = "",
         vendor: str = "",
     ) -> "ExecutorResourceRequests":
+        """
+        Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
+        correspond to the regular Spark configs with the prefix removed. For instance, resources
+        like GPUs are gpu (spark configs `spark.executor.resource.gpu.*`). If you pass in a resource
+        that the cluster manager doesn't support the result is undefined, it may error or may just
+        be ignored.
+        This is a convenient API to add :class:`ExecutorResourceRequest` for custom resources.
+
+        Parameters
+        ----------
+        resourceName : str
+            Name of the resource.
+        amount : str
+            amount of that resource per executor to use.
+        discoveryScript : str, optional
+            Optional script used to discover the resources. This is required on
+            some cluster managers that don't tell Spark the addresses of
+            the resources allocated. The script runs on Executors startup to
+            of the resources available.
+        vendor : str
+            Optional vendor, required for some cluster managers
+
+        Returns
+        -------
+        :class:`ExecutorResourceRequests`
+        """
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.resource(
                 resourceName, amount, discoveryScript, vendor
@@ -218,6 +349,12 @@ class ExecutorResourceRequests:
 
     @property
     def requests(self) -> Dict[str, ExecutorResourceRequest]:
+        """
+        Returns
+        -------
+        dict
+            Returns all the resource requests for the executor.
+        """
         if self._java_executor_resource_requests is not None:
             result = {}
             execRes = self._java_executor_resource_requests.requestsJMap()
@@ -235,7 +372,7 @@ class TaskResourceRequest:
     A task resource request. This is used in conjunction with the
     :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
     needed for an RDD that will be applied at the stage level. The amount is specified
-    as a Double to allow for saying you want more than 1 task per resource. Valid values
+    as a float to allow for saying you want more than 1 task per resource. Valid values
     are less than or equal to 0.5 or whole numbers.
     Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
 
@@ -245,10 +382,16 @@ class TaskResourceRequest:
         Name of the resource
     amount : float
         Amount requesting as a float to support fractional resource requests.
-        Valid values are less than or equal to 0.5 or whole numbers.
+        Valid values are less than or equal to 0.5 or whole numbers. This essentially
+        lets you configure X number of tasks to run on a single resource,
+        ie amount equals 0.5 translates into 2 tasks per resource address.
 
     .. versionadded:: 3.1.0
 
+    See Also
+    --------
+    :class:`pyspark.resource.ResourceProfile`
+
     Notes
     -----
     This API is evolving.
@@ -260,10 +403,22 @@ class TaskResourceRequest:
 
     @property
     def resourceName(self) -> str:
+        """
+        Returns
+        -------
+        str
+            Name of the resource.
+        """
         return self._name
 
     @property
     def amount(self) -> float:
+        """
+        Returns
+        -------
+        str
+            Amount requesting as a float to support fractional resource requests.
+        """
         return self._amount
 
 
@@ -276,6 +431,10 @@ class TaskResourceRequests:
 
     .. versionadded:: 3.1.0
 
+    See Also
+    --------
+    :class:`pyspark.resource.ResourceProfile`
+
     Notes
     -----
     This API is evolving.
@@ -318,6 +477,19 @@ class TaskResourceRequests:
             self._task_resources: Dict[str, TaskResourceRequest] = {}
 
     def cpus(self, amount: int) -> "TaskResourceRequests":
+        """
+        Specify number of cpus per Task.
+        This is a convenient API to add :class:`TaskResourceRequest` for cpus.
+
+        Parameters
+        ----------
+        amount : int
+            Number of cpus to allocate per Task.
+
+        Returns
+        -------
+        :class:`TaskResourceRequests`
+        """
         if self._java_task_resource_requests is not None:
             self._java_task_resource_requests.cpus(amount)
         else:
@@ -325,6 +497,24 @@ class TaskResourceRequests:
         return self
 
     def resource(self, resourceName: str, amount: float) -> "TaskResourceRequests":
+        """
+        Amount of a particular custom resource(GPU, FPGA, etc) to use.
+        This is a convenient API to add :class:`TaskResourceRequest` for custom resources.
+
+        Parameters
+        ----------
+        resourceName : str
+            Name of the resource.
+        amount : float
+            Amount requesting as a float to support fractional resource requests.
+            Valid values are less than or equal to 0.5 or whole numbers. This essentially
+            lets you configure X number of tasks to run on a single resource,
+            ie amount equals 0.5 translates into 2 tasks per resource address.
+
+        Returns
+        -------
+        :class:`TaskResourceRequests`
+        """
         if self._java_task_resource_requests is not None:
             self._java_task_resource_requests.resource(resourceName, float(amount))
         else:
@@ -333,6 +523,12 @@ class TaskResourceRequests:
 
     @property
     def requests(self) -> Dict[str, TaskResourceRequest]:
+        """
+        Returns
+        -------
+        dict
+            Returns all the resource requests for the task.
+        """
         if self._java_task_resource_requests is not None:
             result = {}
             taskRes = self._java_task_resource_requests.requestsJMap()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org