You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/20 00:10:46 UTC

[spark] branch master updated: [SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fb22aa  [SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes
6fb22aa is described below

commit 6fb22aa42db421c3b06bba3feeff7a04963fc365
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Tue May 19 17:09:37 2020 -0700

    [SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes
    
    ### What changes were proposed in this pull request?
    
    This PR is kind of a followup for SPARK-29641 and SPARK-28234. This PR proposes:
    
    1.. Document the new `pyspark.resource` module introduced at https://github.com/apache/spark/commit/95aec091e4d8a45e648ce84d32d912f585eeb151, in PySpark API docs.
    
    2.. Move classes into fewer and simpler modules
    
    Before:
    
    ```
    pyspark
    ├── resource
    │   ├── executorrequests.py
    │   │   ├── class ExecutorResourceRequest
    │   │   └── class ExecutorResourceRequests
    │   ├── taskrequests.py
    │   │   ├── class TaskResourceRequest
    │   │   └── class TaskResourceRequests
    │   ├── resourceprofilebuilder.py
    │   │   └── class ResourceProfileBuilder
    │   ├── resourceprofile.py
    │   │   └── class ResourceProfile
    └── resourceinformation
        └── class ResourceInformation
    ```
    
    After:
    
    ```
    pyspark
    └── resource
        ├── requests.py
        │   ├── class ExecutorResourceRequest
        │   ├── class ExecutorResourceRequests
        │   ├── class TaskResourceRequest
        │   └── class TaskResourceRequests
        ├── profile.py
        │   ├── class ResourceProfileBuilder
        │   └── class ResourceProfile
        └── information.py
            └── class ResourceInformation
    ```
    
    3.. Minor docstring fix e.g.:
    
    ```diff
    -     param name the name of the resource
    -     param addresses an array of strings describing the addresses of the resource
    +     :param name: the name of the resource
    +     :param addresses: an array of strings describing the addresses of the resource
    +
    +     .. versionadded:: 3.0.0
    ```
    
    ### Why are the changes needed?
    
    To document APIs, and move Python modules to fewer and simpler modules.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the changes are in unreleased branches.
    
    ### How was this patch tested?
    
    Manually tested via:
    
    ```bash
    cd python
    ./run-tests --python-executables=python3 --modules=pyspark-core
    ./run-tests --python-executables=python3 --modules=pyspark-resource
    ```
    
    Closes #28569 from HyukjinKwon/SPARK-28234-SPARK-29641-followup.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 python/docs/index.rst                              |   1 +
 python/docs/pyspark.resource.rst                   |  11 +++
 python/docs/pyspark.rst                            |   1 +
 python/docs/pyspark.sql.rst                        |   4 +-
 python/pyspark/__init__.py                         |   2 +-
 python/pyspark/context.py                          |   2 +-
 python/pyspark/rdd.py                              |   5 +-
 python/pyspark/resource/__init__.py                |  11 ++-
 .../information.py}                                |   6 +-
 .../{resourceprofilebuilder.py => profile.py}      |  59 +++++++++++-
 .../resource/{executorrequests.py => requests.py}  |  87 +++++++++++++++++-
 python/pyspark/resource/resourceprofile.py         |  72 ---------------
 python/pyspark/resource/taskrequests.py            | 102 ---------------------
 python/pyspark/worker.py                           |   2 +-
 14 files changed, 171 insertions(+), 194 deletions(-)

diff --git a/python/docs/index.rst b/python/docs/index.rst
index 0e7b623..6e05926 100644
--- a/python/docs/index.rst
+++ b/python/docs/index.rst
@@ -16,6 +16,7 @@ Contents:
    pyspark.streaming
    pyspark.ml
    pyspark.mllib
+   pyspark.resource
 
 
 Core classes:
diff --git a/python/docs/pyspark.resource.rst b/python/docs/pyspark.resource.rst
new file mode 100644
index 0000000..7f3a79b
--- /dev/null
+++ b/python/docs/pyspark.resource.rst
@@ -0,0 +1,11 @@
+pyspark.resource module
+=======================
+
+Module Contents
+---------------
+
+.. automodule:: pyspark.resource
+    :members:
+    :undoc-members:
+    :inherited-members:
+
diff --git a/python/docs/pyspark.rst b/python/docs/pyspark.rst
index 0df12c4..402d6ce 100644
--- a/python/docs/pyspark.rst
+++ b/python/docs/pyspark.rst
@@ -11,6 +11,7 @@ Subpackages
     pyspark.streaming
     pyspark.ml
     pyspark.mllib
+    pyspark.resource
 
 Contents
 --------
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index b69562e..406ada7 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -1,8 +1,8 @@
 pyspark.sql module
 ==================
 
-Module Context
---------------
+Module Contents
+---------------
 
 .. automodule:: pyspark.sql
     :members:
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 70c0b27..2fb6f50 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -54,7 +54,7 @@ from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
 from pyspark.accumulators import Accumulator, AccumulatorParam
 from pyspark.broadcast import Broadcast
-from pyspark.resourceinformation import ResourceInformation
+from pyspark.resource.information import ResourceInformation
 from pyspark.serializers import MarshalSerializer, PickleSerializer
 from pyspark.status import *
 from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6cc343e..4f29f2f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -35,7 +35,7 @@ from pyspark.java_gateway import launch_gateway, local_connect_and_auth
 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
     PairDeserializer, AutoBatchedSerializer, NoOpSerializer, ChunkedStream
 from pyspark.storagelevel import StorageLevel
-from pyspark.resourceinformation import ResourceInformation
+from pyspark.resource.information import ResourceInformation
 from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
 from pyspark.traceback_utils import CallSite, first_spark_call
 from pyspark.status import StatusTracker
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d0ac000..db0c197 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -47,9 +47,8 @@ from pyspark.join import python_join, python_left_outer_join, \
 from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
 from pyspark.storagelevel import StorageLevel
-from pyspark.resource.executorrequests import ExecutorResourceRequests
-from pyspark.resource.resourceprofile import ResourceProfile
-from pyspark.resource.taskrequests import TaskResourceRequests
+from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests
+from pyspark.resource.profile import ResourceProfile
 from pyspark.resultiterable import ResultIterable
 from pyspark.shuffle import Aggregator, ExternalMerger, \
     get_used_memory, ExternalSorter, ExternalGroupBy
diff --git a/python/pyspark/resource/__init__.py b/python/pyspark/resource/__init__.py
index 89070ec..b5f4c4a 100644
--- a/python/pyspark/resource/__init__.py
+++ b/python/pyspark/resource/__init__.py
@@ -18,12 +18,13 @@
 """
 APIs to let users manipulate resource requirements.
 """
-from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests
-from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
-from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder
-from pyspark.resource.resourceprofile import ResourceProfile
+from pyspark.resource.information import ResourceInformation
+from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
+    ExecutorResourceRequest, ExecutorResourceRequests
+from pyspark.resource.profile import ResourceProfile, ResourceProfileBuilder
 
 __all__ = [
     "TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
-    "ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder",
+    "ExecutorResourceRequests", "ResourceProfile", "ResourceInformation",
+    "ResourceProfileBuilder",
 ]
diff --git a/python/pyspark/resourceinformation.py b/python/pyspark/resource/information.py
similarity index 89%
rename from python/pyspark/resourceinformation.py
rename to python/pyspark/resource/information.py
index aaed213..b0e41cc 100644
--- a/python/pyspark/resourceinformation.py
+++ b/python/pyspark/resource/information.py
@@ -26,8 +26,10 @@ class ResourceInformation(object):
 
     One example is GPUs, where the addresses would be the indices of the GPUs
 
-    @param name the name of the resource
-    @param addresses an array of strings describing the addresses of the resource
+    :param name: the name of the resource
+    :param addresses: an array of strings describing the addresses of the resource
+
+    .. versionadded:: 3.0.0
     """
 
     def __init__(self, name, addresses):
diff --git a/python/pyspark/resource/resourceprofilebuilder.py b/python/pyspark/resource/profile.py
similarity index 69%
rename from python/pyspark/resource/resourceprofilebuilder.py
rename to python/pyspark/resource/profile.py
index 6765428..3f6ae1d 100644
--- a/python/pyspark/resource/resourceprofilebuilder.py
+++ b/python/pyspark/resource/profile.py
@@ -15,10 +15,61 @@
 # limitations under the License.
 #
 
-from pyspark.resource.executorrequests import ExecutorResourceRequest,\
-    ExecutorResourceRequests
-from pyspark.resource.resourceprofile import ResourceProfile
-from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
+from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
+    ExecutorResourceRequests, ExecutorResourceRequest
+
+
+class ResourceProfile(object):
+
+    """
+    .. note:: Evolving
+
+    Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
+    allows the user to specify executor and task requirements for an RDD that will get
+    applied during a stage. This allows the user to change the resource requirements between
+    stages. This is meant to be immutable so user cannot change it after building.
+
+    .. versionadded:: 3.1.0
+    """
+
+    def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
+        if _java_resource_profile is not None:
+            self._java_resource_profile = _java_resource_profile
+        else:
+            self._java_resource_profile = None
+            self._executor_resource_requests = _exec_req
+            self._task_resource_requests = _task_req
+
+    @property
+    def id(self):
+        if self._java_resource_profile is not None:
+            return self._java_resource_profile.id()
+        else:
+            raise RuntimeError("SparkContext must be created to get the id, get the id "
+                               "after adding the ResourceProfile to an RDD")
+
+    @property
+    def taskResources(self):
+        if self._java_resource_profile is not None:
+            taskRes = self._java_resource_profile.taskResourcesJMap()
+            result = {}
+            for k, v in taskRes.items():
+                result[k] = TaskResourceRequest(v.resourceName(), v.amount())
+            return result
+        else:
+            return self._task_resource_requests
+
+    @property
+    def executorResources(self):
+        if self._java_resource_profile is not None:
+            execRes = self._java_resource_profile.executorResourcesJMap()
+            result = {}
+            for k, v in execRes.items():
+                result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
+                                                    v.discoveryScript(), v.vendor())
+            return result
+        else:
+            return self._executor_resource_requests
 
 
 class ResourceProfileBuilder(object):
diff --git a/python/pyspark/resource/executorrequests.py b/python/pyspark/resource/requests.py
similarity index 70%
rename from python/pyspark/resource/executorrequests.py
rename to python/pyspark/resource/requests.py
index 91a195c..56ad6e8 100644
--- a/python/pyspark/resource/executorrequests.py
+++ b/python/pyspark/resource/requests.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 #
 
-from pyspark.resource.taskrequests import TaskResourceRequest
 from pyspark.util import _parse_memory
 
 
@@ -167,3 +166,89 @@ class ExecutorResourceRequests(object):
             return result
         else:
             return self._executor_resources
+
+
+class TaskResourceRequest(object):
+    """
+    .. note:: Evolving
+
+    A task resource request. This is used in conjuntion with the
+    :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
+    needed for an RDD that will be applied at the stage level. The amount is specified
+    as a Double to allow for saying you want more than 1 task per resource. Valid values
+    are less than or equal to 0.5 or whole numbers.
+    Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
+
+    :param resourceName: Name of the resource
+    :param amount: Amount requesting as a Double to support fractional resource requests.
+        Valid values are less than or equal to 0.5 or whole numbers.
+
+    .. versionadded:: 3.1.0
+    """
+    def __init__(self, resourceName, amount):
+        self._name = resourceName
+        self._amount = float(amount)
+
+    @property
+    def resourceName(self):
+        return self._name
+
+    @property
+    def amount(self):
+        return self._amount
+
+
+class TaskResourceRequests(object):
+
+    """
+    .. note:: Evolving
+
+    A set of task resource requests. This is used in conjuntion with the
+    :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
+    needed for an RDD that will be applied at the stage level.
+
+    .. versionadded:: 3.1.0
+    """
+
+    _CPUS = "cpus"
+
+    def __init__(self, _jvm=None, _requests=None):
+        from pyspark import SparkContext
+        _jvm = _jvm or SparkContext._jvm
+        if _jvm is not None:
+            self._java_task_resource_requests = \
+                SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
+            if _requests is not None:
+                for k, v in _requests.items():
+                    if k == self._CPUS:
+                        self._java_task_resource_requests.cpus(int(v.amount))
+                    else:
+                        self._java_task_resource_requests.resource(v.resourceName, v.amount)
+        else:
+            self._java_task_resource_requests = None
+            self._task_resources = {}
+
+    def cpus(self, amount):
+        if self._java_task_resource_requests is not None:
+            self._java_task_resource_requests.cpus(amount)
+        else:
+            self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
+        return self
+
+    def resource(self, resourceName, amount):
+        if self._java_task_resource_requests is not None:
+            self._java_task_resource_requests.resource(resourceName, float(amount))
+        else:
+            self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
+        return self
+
+    @property
+    def requests(self):
+        if self._java_task_resource_requests is not None:
+            result = {}
+            taskRes = self._java_task_resource_requests.requestsJMap()
+            for k, v in taskRes.items():
+                result[k] = TaskResourceRequest(v.resourceName(), v.amount())
+            return result
+        else:
+            return self._task_resources
diff --git a/python/pyspark/resource/resourceprofile.py b/python/pyspark/resource/resourceprofile.py
deleted file mode 100644
index 59e9ccb..0000000
--- a/python/pyspark/resource/resourceprofile.py
+++ /dev/null
@@ -1,72 +0,0 @@
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from pyspark.resource.taskrequests import TaskResourceRequest
-from pyspark.resource.executorrequests import ExecutorResourceRequest
-
-
-class ResourceProfile(object):
-
-    """
-    .. note:: Evolving
-
-    Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
-    allows the user to specify executor and task requirements for an RDD that will get
-    applied during a stage. This allows the user to change the resource requirements between
-    stages. This is meant to be immutable so user doesn't change it after building.
-
-    .. versionadded:: 3.1.0
-    """
-
-    def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
-        if _java_resource_profile is not None:
-            self._java_resource_profile = _java_resource_profile
-        else:
-            self._java_resource_profile = None
-            self._executor_resource_requests = _exec_req
-            self._task_resource_requests = _task_req
-
-    @property
-    def id(self):
-        if self._java_resource_profile is not None:
-            return self._java_resource_profile.id()
-        else:
-            raise RuntimeError("SparkContext must be created to get the id, get the id "
-                               "after adding the ResourceProfile to an RDD")
-
-    @property
-    def taskResources(self):
-        if self._java_resource_profile is not None:
-            taskRes = self._java_resource_profile.taskResourcesJMap()
-            result = {}
-            for k, v in taskRes.items():
-                result[k] = TaskResourceRequest(v.resourceName(), v.amount())
-            return result
-        else:
-            return self._task_resource_requests
-
-    @property
-    def executorResources(self):
-        if self._java_resource_profile is not None:
-            execRes = self._java_resource_profile.executorResourcesJMap()
-            result = {}
-            for k, v in execRes.items():
-                result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
-                                                    v.discoveryScript(), v.vendor())
-            return result
-        else:
-            return self._executor_resource_requests
diff --git a/python/pyspark/resource/taskrequests.py b/python/pyspark/resource/taskrequests.py
deleted file mode 100644
index e8dca98..0000000
--- a/python/pyspark/resource/taskrequests.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-class TaskResourceRequest(object):
-    """
-    .. note:: Evolving
-
-    A task resource request. This is used in conjuntion with the
-    :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
-    needed for an RDD that will be applied at the stage level. The amount is specified
-    as a Double to allow for saying you want more then 1 task per resource. Valid values
-    are less than or equal to 0.5 or whole numbers.
-    Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
-
-    :param resourceName: Name of the resource
-    :param amount: Amount requesting as a Double to support fractional resource requests.
-        Valid values are less than or equal to 0.5 or whole numbers.
-
-    .. versionadded:: 3.1.0
-    """
-    def __init__(self, resourceName, amount):
-        self._name = resourceName
-        self._amount = float(amount)
-
-    @property
-    def resourceName(self):
-        return self._name
-
-    @property
-    def amount(self):
-        return self._amount
-
-
-class TaskResourceRequests(object):
-
-    """
-    .. note:: Evolving
-
-    A set of task resource requests. This is used in conjuntion with the
-    :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
-    needed for an RDD that will be applied at the stage level.
-
-    .. versionadded:: 3.1.0
-    """
-
-    _CPUS = "cpus"
-
-    def __init__(self, _jvm=None, _requests=None):
-        from pyspark import SparkContext
-        _jvm = _jvm or SparkContext._jvm
-        if _jvm is not None:
-            self._java_task_resource_requests = \
-                SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
-            if _requests is not None:
-                for k, v in _requests.items():
-                    if k == self._CPUS:
-                        self._java_task_resource_requests.cpus(int(v.amount))
-                    else:
-                        self._java_task_resource_requests.resource(v.resourceName, v.amount)
-        else:
-            self._java_task_resource_requests = None
-            self._task_resources = {}
-
-    def cpus(self, amount):
-        if self._java_task_resource_requests is not None:
-            self._java_task_resource_requests.cpus(amount)
-        else:
-            self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
-        return self
-
-    def resource(self, resourceName, amount):
-        if self._java_task_resource_requests is not None:
-            self._java_task_resource_requests.resource(resourceName, float(amount))
-        else:
-            self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
-        return self
-
-    @property
-    def requests(self):
-        if self._java_task_resource_requests is not None:
-            result = {}
-            taskRes = self._java_task_resource_requests.requestsJMap()
-            for k, v in taskRes.items():
-                result[k] = TaskResourceRequest(v.resourceName(), v.amount())
-            return result
-        else:
-            return self._task_resources
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 988941e..5f4a8a2 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -36,7 +36,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
 from pyspark.java_gateway import local_connect_and_auth
 from pyspark.taskcontext import BarrierTaskContext, TaskContext
 from pyspark.files import SparkFiles
-from pyspark.resourceinformation import ResourceInformation
+from pyspark.resource import ResourceInformation
 from pyspark.rdd import PythonEvalType
 from pyspark.serializers import write_with_length, write_int, read_long, read_bool, \
     write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org