You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/20 00:10:46 UTC
[spark] branch master updated: [SPARK-31748][PYTHON] Document
resource module in PySpark doc and rename/move classes
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6fb22aa [SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes
6fb22aa is described below
commit 6fb22aa42db421c3b06bba3feeff7a04963fc365
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Tue May 19 17:09:37 2020 -0700
[SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes
### What changes were proposed in this pull request?
This PR is kind of a followup for SPARK-29641 and SPARK-28234. This PR proposes:
1.. Document the new `pyspark.resource` module introduced at https://github.com/apache/spark/commit/95aec091e4d8a45e648ce84d32d912f585eeb151, in PySpark API docs.
2.. Move classes into fewer and simpler modules
Before:
```
pyspark
├── resource
│ ├── executorrequests.py
│ │ ├── class ExecutorResourceRequest
│ │ └── class ExecutorResourceRequests
│ ├── taskrequests.py
│ │ ├── class TaskResourceRequest
│ │ └── class TaskResourceRequests
│ ├── resourceprofilebuilder.py
│ │ └── class ResourceProfileBuilder
│ ├── resourceprofile.py
│ │ └── class ResourceProfile
└── resourceinformation
└── class ResourceInformation
```
After:
```
pyspark
└── resource
├── requests.py
│ ├── class ExecutorResourceRequest
│ ├── class ExecutorResourceRequests
│ ├── class TaskResourceRequest
│ └── class TaskResourceRequests
├── profile.py
│ ├── class ResourceProfileBuilder
│ └── class ResourceProfile
└── information.py
└── class ResourceInformation
```
3.. Minor docstring fix e.g.:
```diff
- param name the name of the resource
- param addresses an array of strings describing the addresses of the resource
+ :param name: the name of the resource
+ :param addresses: an array of strings describing the addresses of the resource
+
+ .. versionadded:: 3.0.0
```
### Why are the changes needed?
To document APIs, and move Python modules to fewer and simpler modules.
### Does this PR introduce _any_ user-facing change?
No, the changes are in unreleased branches.
### How was this patch tested?
Manually tested via:
```bash
cd python
./run-tests --python-executables=python3 --modules=pyspark-core
./run-tests --python-executables=python3 --modules=pyspark-resource
```
Closes #28569 from HyukjinKwon/SPARK-28234-SPARK-29641-followup.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
python/docs/index.rst | 1 +
python/docs/pyspark.resource.rst | 11 +++
python/docs/pyspark.rst | 1 +
python/docs/pyspark.sql.rst | 4 +-
python/pyspark/__init__.py | 2 +-
python/pyspark/context.py | 2 +-
python/pyspark/rdd.py | 5 +-
python/pyspark/resource/__init__.py | 11 ++-
.../information.py} | 6 +-
.../{resourceprofilebuilder.py => profile.py} | 59 +++++++++++-
.../resource/{executorrequests.py => requests.py} | 87 +++++++++++++++++-
python/pyspark/resource/resourceprofile.py | 72 ---------------
python/pyspark/resource/taskrequests.py | 102 ---------------------
python/pyspark/worker.py | 2 +-
14 files changed, 171 insertions(+), 194 deletions(-)
diff --git a/python/docs/index.rst b/python/docs/index.rst
index 0e7b623..6e05926 100644
--- a/python/docs/index.rst
+++ b/python/docs/index.rst
@@ -16,6 +16,7 @@ Contents:
pyspark.streaming
pyspark.ml
pyspark.mllib
+ pyspark.resource
Core classes:
diff --git a/python/docs/pyspark.resource.rst b/python/docs/pyspark.resource.rst
new file mode 100644
index 0000000..7f3a79b
--- /dev/null
+++ b/python/docs/pyspark.resource.rst
@@ -0,0 +1,11 @@
+pyspark.resource module
+=======================
+
+Module Contents
+---------------
+
+.. automodule:: pyspark.resource
+ :members:
+ :undoc-members:
+ :inherited-members:
+
diff --git a/python/docs/pyspark.rst b/python/docs/pyspark.rst
index 0df12c4..402d6ce 100644
--- a/python/docs/pyspark.rst
+++ b/python/docs/pyspark.rst
@@ -11,6 +11,7 @@ Subpackages
pyspark.streaming
pyspark.ml
pyspark.mllib
+ pyspark.resource
Contents
--------
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index b69562e..406ada7 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -1,8 +1,8 @@
pyspark.sql module
==================
-Module Context
---------------
+Module Contents
+---------------
.. automodule:: pyspark.sql
:members:
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 70c0b27..2fb6f50 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -54,7 +54,7 @@ from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
-from pyspark.resourceinformation import ResourceInformation
+from pyspark.resource.information import ResourceInformation
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.status import *
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6cc343e..4f29f2f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -35,7 +35,7 @@ from pyspark.java_gateway import launch_gateway, local_connect_and_auth
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer, ChunkedStream
from pyspark.storagelevel import StorageLevel
-from pyspark.resourceinformation import ResourceInformation
+from pyspark.resource.information import ResourceInformation
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d0ac000..db0c197 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -47,9 +47,8 @@ from pyspark.join import python_join, python_left_outer_join, \
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
-from pyspark.resource.executorrequests import ExecutorResourceRequests
-from pyspark.resource.resourceprofile import ResourceProfile
-from pyspark.resource.taskrequests import TaskResourceRequests
+from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests
+from pyspark.resource.profile import ResourceProfile
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
diff --git a/python/pyspark/resource/__init__.py b/python/pyspark/resource/__init__.py
index 89070ec..b5f4c4a 100644
--- a/python/pyspark/resource/__init__.py
+++ b/python/pyspark/resource/__init__.py
@@ -18,12 +18,13 @@
"""
APIs to let users manipulate resource requirements.
"""
-from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests
-from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
-from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder
-from pyspark.resource.resourceprofile import ResourceProfile
+from pyspark.resource.information import ResourceInformation
+from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
+ ExecutorResourceRequest, ExecutorResourceRequests
+from pyspark.resource.profile import ResourceProfile, ResourceProfileBuilder
__all__ = [
"TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
- "ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder",
+ "ExecutorResourceRequests", "ResourceProfile", "ResourceInformation",
+ "ResourceProfileBuilder",
]
diff --git a/python/pyspark/resourceinformation.py b/python/pyspark/resource/information.py
similarity index 89%
rename from python/pyspark/resourceinformation.py
rename to python/pyspark/resource/information.py
index aaed213..b0e41cc 100644
--- a/python/pyspark/resourceinformation.py
+++ b/python/pyspark/resource/information.py
@@ -26,8 +26,10 @@ class ResourceInformation(object):
One example is GPUs, where the addresses would be the indices of the GPUs
- @param name the name of the resource
- @param addresses an array of strings describing the addresses of the resource
+ :param name: the name of the resource
+ :param addresses: an array of strings describing the addresses of the resource
+
+ .. versionadded:: 3.0.0
"""
def __init__(self, name, addresses):
diff --git a/python/pyspark/resource/resourceprofilebuilder.py b/python/pyspark/resource/profile.py
similarity index 69%
rename from python/pyspark/resource/resourceprofilebuilder.py
rename to python/pyspark/resource/profile.py
index 6765428..3f6ae1d 100644
--- a/python/pyspark/resource/resourceprofilebuilder.py
+++ b/python/pyspark/resource/profile.py
@@ -15,10 +15,61 @@
# limitations under the License.
#
-from pyspark.resource.executorrequests import ExecutorResourceRequest,\
- ExecutorResourceRequests
-from pyspark.resource.resourceprofile import ResourceProfile
-from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
+from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
+ ExecutorResourceRequests, ExecutorResourceRequest
+
+
+class ResourceProfile(object):
+
+ """
+ .. note:: Evolving
+
+ Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
+ allows the user to specify executor and task requirements for an RDD that will get
+ applied during a stage. This allows the user to change the resource requirements between
+ stages. This is meant to be immutable so user cannot change it after building.
+
+ .. versionadded:: 3.1.0
+ """
+
+ def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
+ if _java_resource_profile is not None:
+ self._java_resource_profile = _java_resource_profile
+ else:
+ self._java_resource_profile = None
+ self._executor_resource_requests = _exec_req
+ self._task_resource_requests = _task_req
+
+ @property
+ def id(self):
+ if self._java_resource_profile is not None:
+ return self._java_resource_profile.id()
+ else:
+ raise RuntimeError("SparkContext must be created to get the id, get the id "
+ "after adding the ResourceProfile to an RDD")
+
+ @property
+ def taskResources(self):
+ if self._java_resource_profile is not None:
+ taskRes = self._java_resource_profile.taskResourcesJMap()
+ result = {}
+ for k, v in taskRes.items():
+ result[k] = TaskResourceRequest(v.resourceName(), v.amount())
+ return result
+ else:
+ return self._task_resource_requests
+
+ @property
+ def executorResources(self):
+ if self._java_resource_profile is not None:
+ execRes = self._java_resource_profile.executorResourcesJMap()
+ result = {}
+ for k, v in execRes.items():
+ result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
+ v.discoveryScript(), v.vendor())
+ return result
+ else:
+ return self._executor_resource_requests
class ResourceProfileBuilder(object):
diff --git a/python/pyspark/resource/executorrequests.py b/python/pyspark/resource/requests.py
similarity index 70%
rename from python/pyspark/resource/executorrequests.py
rename to python/pyspark/resource/requests.py
index 91a195c..56ad6e8 100644
--- a/python/pyspark/resource/executorrequests.py
+++ b/python/pyspark/resource/requests.py
@@ -15,7 +15,6 @@
# limitations under the License.
#
-from pyspark.resource.taskrequests import TaskResourceRequest
from pyspark.util import _parse_memory
@@ -167,3 +166,89 @@ class ExecutorResourceRequests(object):
return result
else:
return self._executor_resources
+
+
+class TaskResourceRequest(object):
+ """
+ .. note:: Evolving
+
+ A task resource request. This is used in conjuntion with the
+ :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
+ needed for an RDD that will be applied at the stage level. The amount is specified
+ as a Double to allow for saying you want more than 1 task per resource. Valid values
+ are less than or equal to 0.5 or whole numbers.
+ Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
+
+ :param resourceName: Name of the resource
+ :param amount: Amount requesting as a Double to support fractional resource requests.
+ Valid values are less than or equal to 0.5 or whole numbers.
+
+ .. versionadded:: 3.1.0
+ """
+ def __init__(self, resourceName, amount):
+ self._name = resourceName
+ self._amount = float(amount)
+
+ @property
+ def resourceName(self):
+ return self._name
+
+ @property
+ def amount(self):
+ return self._amount
+
+
+class TaskResourceRequests(object):
+
+ """
+ .. note:: Evolving
+
+ A set of task resource requests. This is used in conjuntion with the
+ :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
+ needed for an RDD that will be applied at the stage level.
+
+ .. versionadded:: 3.1.0
+ """
+
+ _CPUS = "cpus"
+
+ def __init__(self, _jvm=None, _requests=None):
+ from pyspark import SparkContext
+ _jvm = _jvm or SparkContext._jvm
+ if _jvm is not None:
+ self._java_task_resource_requests = \
+ SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
+ if _requests is not None:
+ for k, v in _requests.items():
+ if k == self._CPUS:
+ self._java_task_resource_requests.cpus(int(v.amount))
+ else:
+ self._java_task_resource_requests.resource(v.resourceName, v.amount)
+ else:
+ self._java_task_resource_requests = None
+ self._task_resources = {}
+
+ def cpus(self, amount):
+ if self._java_task_resource_requests is not None:
+ self._java_task_resource_requests.cpus(amount)
+ else:
+ self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
+ return self
+
+ def resource(self, resourceName, amount):
+ if self._java_task_resource_requests is not None:
+ self._java_task_resource_requests.resource(resourceName, float(amount))
+ else:
+ self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
+ return self
+
+ @property
+ def requests(self):
+ if self._java_task_resource_requests is not None:
+ result = {}
+ taskRes = self._java_task_resource_requests.requestsJMap()
+ for k, v in taskRes.items():
+ result[k] = TaskResourceRequest(v.resourceName(), v.amount())
+ return result
+ else:
+ return self._task_resources
diff --git a/python/pyspark/resource/resourceprofile.py b/python/pyspark/resource/resourceprofile.py
deleted file mode 100644
index 59e9ccb..0000000
--- a/python/pyspark/resource/resourceprofile.py
+++ /dev/null
@@ -1,72 +0,0 @@
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from pyspark.resource.taskrequests import TaskResourceRequest
-from pyspark.resource.executorrequests import ExecutorResourceRequest
-
-
-class ResourceProfile(object):
-
- """
- .. note:: Evolving
-
- Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
- allows the user to specify executor and task requirements for an RDD that will get
- applied during a stage. This allows the user to change the resource requirements between
- stages. This is meant to be immutable so user doesn't change it after building.
-
- .. versionadded:: 3.1.0
- """
-
- def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
- if _java_resource_profile is not None:
- self._java_resource_profile = _java_resource_profile
- else:
- self._java_resource_profile = None
- self._executor_resource_requests = _exec_req
- self._task_resource_requests = _task_req
-
- @property
- def id(self):
- if self._java_resource_profile is not None:
- return self._java_resource_profile.id()
- else:
- raise RuntimeError("SparkContext must be created to get the id, get the id "
- "after adding the ResourceProfile to an RDD")
-
- @property
- def taskResources(self):
- if self._java_resource_profile is not None:
- taskRes = self._java_resource_profile.taskResourcesJMap()
- result = {}
- for k, v in taskRes.items():
- result[k] = TaskResourceRequest(v.resourceName(), v.amount())
- return result
- else:
- return self._task_resource_requests
-
- @property
- def executorResources(self):
- if self._java_resource_profile is not None:
- execRes = self._java_resource_profile.executorResourcesJMap()
- result = {}
- for k, v in execRes.items():
- result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
- v.discoveryScript(), v.vendor())
- return result
- else:
- return self._executor_resource_requests
diff --git a/python/pyspark/resource/taskrequests.py b/python/pyspark/resource/taskrequests.py
deleted file mode 100644
index e8dca98..0000000
--- a/python/pyspark/resource/taskrequests.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-class TaskResourceRequest(object):
- """
- .. note:: Evolving
-
- A task resource request. This is used in conjuntion with the
- :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
- needed for an RDD that will be applied at the stage level. The amount is specified
- as a Double to allow for saying you want more then 1 task per resource. Valid values
- are less than or equal to 0.5 or whole numbers.
- Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
-
- :param resourceName: Name of the resource
- :param amount: Amount requesting as a Double to support fractional resource requests.
- Valid values are less than or equal to 0.5 or whole numbers.
-
- .. versionadded:: 3.1.0
- """
- def __init__(self, resourceName, amount):
- self._name = resourceName
- self._amount = float(amount)
-
- @property
- def resourceName(self):
- return self._name
-
- @property
- def amount(self):
- return self._amount
-
-
-class TaskResourceRequests(object):
-
- """
- .. note:: Evolving
-
- A set of task resource requests. This is used in conjuntion with the
- :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
- needed for an RDD that will be applied at the stage level.
-
- .. versionadded:: 3.1.0
- """
-
- _CPUS = "cpus"
-
- def __init__(self, _jvm=None, _requests=None):
- from pyspark import SparkContext
- _jvm = _jvm or SparkContext._jvm
- if _jvm is not None:
- self._java_task_resource_requests = \
- SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
- if _requests is not None:
- for k, v in _requests.items():
- if k == self._CPUS:
- self._java_task_resource_requests.cpus(int(v.amount))
- else:
- self._java_task_resource_requests.resource(v.resourceName, v.amount)
- else:
- self._java_task_resource_requests = None
- self._task_resources = {}
-
- def cpus(self, amount):
- if self._java_task_resource_requests is not None:
- self._java_task_resource_requests.cpus(amount)
- else:
- self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
- return self
-
- def resource(self, resourceName, amount):
- if self._java_task_resource_requests is not None:
- self._java_task_resource_requests.resource(resourceName, float(amount))
- else:
- self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
- return self
-
- @property
- def requests(self):
- if self._java_task_resource_requests is not None:
- result = {}
- taskRes = self._java_task_resource_requests.requestsJMap()
- for k, v in taskRes.items():
- result[k] = TaskResourceRequest(v.resourceName(), v.amount())
- return result
- else:
- return self._task_resources
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 988941e..5f4a8a2 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -36,7 +36,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.java_gateway import local_connect_and_auth
from pyspark.taskcontext import BarrierTaskContext, TaskContext
from pyspark.files import SparkFiles
-from pyspark.resourceinformation import ResourceInformation
+from pyspark.resource import ResourceInformation
from pyspark.rdd import PythonEvalType
from pyspark.serializers import write_with_length, write_int, read_long, read_bool, \
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org