You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/21 09:30:55 UTC
[spark] branch branch-3.1 updated: [SPARK-34384][CORE] Add missing
docs for ResourceProfile APIs
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 02be408 [SPARK-34384][CORE] Add missing docs for ResourceProfile APIs
02be408 is described below
commit 02be4088b7b58a1fd270b212f3d97cea8561d173
Author: yi.wu <yi...@databricks.com>
AuthorDate: Sun Feb 21 18:29:44 2021 +0900
[SPARK-34384][CORE] Add missing docs for ResourceProfile APIs
### What changes were proposed in this pull request?
This PR adds missing docs for ResourceProfile related APIs. Besides, it includes a few minor changes on API:
* ResourceProfileBuilder.build -> ResourceProfileBuilder.builder()
* Provides java specific API `allSupportedExecutorResourcesJList`
* private `ResourceAllocator` since it was mistakenly exposed previously
### Why are the changes needed?
Add missing API docs
### Does this PR introduce _any_ user-facing change?
No, as Apache Spark 3.1 hasn't officially released.
### How was this patch tested?
Updated unit tests due to the signature change of `build()`.
Closes #31496 from Ngone51/resource-profile-api-cleanup.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 546d2eb5d46813a14c7bd30113fb6bb038cdd2fc)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../spark/resource/ExecutorResourceRequest.scala | 4 +--
.../spark/resource/ExecutorResourceRequests.scala | 12 +++++++
.../apache/spark/resource/ResourceAllocator.scala | 2 +-
.../apache/spark/resource/ResourceProfile.scala | 37 ++++++++++++++++++++--
.../spark/resource/ResourceProfileBuilder.scala | 20 +++++++++---
.../spark/resource/TaskResourceRequest.scala | 10 ++++--
.../spark/resource/TaskResourceRequests.scala | 13 +++++++-
.../features/BasicExecutorFeatureStepSuite.scala | 4 +--
.../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 2 +-
9 files changed, 88 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
index 3e3db7e..76af41a 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
@@ -20,7 +20,7 @@ package org.apache.spark.resource
import org.apache.spark.annotation.{Evolving, Since}
/**
- * An Executor resource request. This is used in conjunction with the ResourceProfile to
+ * An Executor resource request. This is used in conjunction with the [[ResourceProfile]] to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
@@ -39,7 +39,7 @@ import org.apache.spark.annotation.{Evolving, Since}
*
* See the configuration and cluster specific docs for more details.
*
- * Use ExecutorResourceRequests class as a convenience API.
+ * Use [[ExecutorResourceRequests]] class as a convenience API.
*
* @param resourceName Name of the resource
* @param amount Amount requesting
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index 654afa0..b6992f4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -37,12 +37,19 @@ class ExecutorResourceRequests() extends Serializable {
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
+ /**
+ * Returns all the resource requests for the task.
+ */
def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
+ /**
+ * (Java-specific) Returns all the resource requests for the executor.
+ */
def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava
/**
* Specify heap memory. The value specified will be converted to MiB.
+ * This is a convenient API to add [[ExecutorResourceRequest]] for "memory" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
@@ -57,6 +64,7 @@ class ExecutorResourceRequests() extends Serializable {
/**
* Specify off heap memory. The value specified will be converted to MiB.
* This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
+ * This is a convenient API to add [[ExecutorResourceRequest]] for "offHeap" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
@@ -70,6 +78,7 @@ class ExecutorResourceRequests() extends Serializable {
/**
* Specify overhead memory. The value specified will be converted to MiB.
+ * This is a convenient API to add [[ExecutorResourceRequest]] for "memoryOverhead" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
@@ -83,6 +92,7 @@ class ExecutorResourceRequests() extends Serializable {
/**
* Specify pyspark memory. The value specified will be converted to MiB.
+ * This is a convenient API to add [[ExecutorResourceRequest]] for "pyspark.memory" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
@@ -96,6 +106,7 @@ class ExecutorResourceRequests() extends Serializable {
/**
* Specify number of cores per Executor.
+ * This is a convenient API to add [[ExecutorResourceRequest]] for "cores" resource.
*
* @param amount Number of cores to allocate per Executor.
*/
@@ -111,6 +122,7 @@ class ExecutorResourceRequests() extends Serializable {
* like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
* that the cluster manager doesn't support the result is undefined, it may error or may just
* be ignored.
+ * This is a convenient API to add [[ExecutorResourceRequest]] for custom resources.
*
* @param resourceName Name of the resource.
* @param amount amount of that resource per executor to use.
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index 22d10a9..7605e8c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkException
* Trait used to help executor/worker allocate resources.
* Please note that this is intended to be used in a single thread.
*/
-trait ResourceAllocator {
+private[spark] trait ResourceAllocator {
protected def resourceName: String
protected def resourceAddresses: Seq[String]
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index ac7e8e8..1ebd8bd 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -35,7 +35,13 @@ import org.apache.spark.util.Utils
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
* specify executor and task requirements for an RDD that will get applied during a
* stage. This allows the user to change the resource requirements between stages.
- * This is meant to be immutable so user can't change it after building.
+ * This is meant to be immutable so user can't change it after building. Users
+ * should use [[ResourceProfileBuilder]] to build it.
+ *
+ * @param executorResources Resource requests for executors. Mapped from the resource
+ * name (e.g., cores, memory, CPU) to its specific request.
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ * name (e.g., cores, memory, CPU) to its specific request.
*/
@Evolving
@Since("3.1.0")
@@ -53,6 +59,9 @@ class ResourceProfile(
private var _maxTasksPerExecutor: Option[Int] = None
private var _coresLimitKnown: Boolean = false
+ /**
+ * A unique id of this ResourceProfile
+ */
def id: Int = _id
/**
@@ -242,17 +251,39 @@ class ResourceProfile(
object ResourceProfile extends Logging {
// task resources
+ /**
+ * built-in task resource: cpus
+ */
val CPUS = "cpus"
// Executor resources
// Make sure add new executor resource in below allSupportedExecutorResources
+ /**
+ * built-in executor resource: cores
+ */
val CORES = "cores"
+ /**
+ * built-in executor resource: cores
+ */
val MEMORY = "memory"
+ /**
+ * built-in executor resource: offHeap
+ */
val OFFHEAP_MEM = "offHeap"
+ /**
+ * built-in executor resource: memoryOverhead
+ */
val OVERHEAD_MEM = "memoryOverhead"
+ /**
+ * built-in executor resource: pyspark.memory
+ */
val PYSPARK_MEM = "pyspark.memory"
- // all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
- val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
+ /**
+ * Return all supported Spark built-in executor resources, custom resources like GPUs/FPGAs
+ * are excluded.
+ */
+ def allSupportedExecutorResources: Array[String] =
+ Array(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
index 29a117b..f6b30d3 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
@@ -26,9 +26,9 @@ import org.apache.spark.annotation.{Evolving, Since}
/**
- * Resource profile builder to build a Resource profile to associate with an RDD.
- * A ResourceProfile allows the user to specify executor and task requirements for an RDD
- * that will get applied during a stage. This allows the user to change the resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource requirements
+ * for an RDD that will get applied during a stage. This allows the user to change the resource
* requirements between stages.
*
*/
@@ -36,7 +36,9 @@ import org.apache.spark.annotation.{Evolving, Since}
@Since("3.1.0")
class ResourceProfileBuilder() {
+ // Task resource requests specified by users, mapped from resource name to the request.
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+ // Executor resource requests specified by users, mapped from resource name to the request.
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
@@ -54,11 +56,21 @@ class ResourceProfileBuilder() {
_executorResources.asScala.asJava
}
+ /**
+ * Add executor resource requests
+ * @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
+ * @return This ResourceProfileBuilder
+ */
def require(requests: ExecutorResourceRequests): this.type = {
_executorResources.putAll(requests.requests.asJava)
this
}
+ /**
+ * Add task resource requests
+ * @param requests The detailed task resource requests, see [[TaskResourceRequest]]
+ * @return This ResourceProfileBuilder
+ */
def require(requests: TaskResourceRequests): this.type = {
_taskResources.putAll(requests.requests.asJava)
this
@@ -80,7 +92,7 @@ class ResourceProfileBuilder() {
s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
}
- def build: ResourceProfile = {
+ def build(): ResourceProfile = {
new ResourceProfile(executorResources, taskResources)
}
}
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
index 12ef342..cbd5780 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
@@ -20,11 +20,17 @@ package org.apache.spark.resource
import org.apache.spark.annotation.{Evolving, Since}
/**
- * A task resource request. This is used in conjunction with the ResourceProfile to
+ * A task resource request. This is used in conjunction with the [[ResourceProfile]] to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
- * Use TaskResourceRequests class as a convenience API.
+ * Use [[TaskResourceRequests]] class as a convenience API.
+ *
+ * @param resourceName Resource name
+ * @param amount Amount requesting as a Double to support fractional resource requests.
+ * Valid values are less than or equal to 0.5 or whole numbers. This essentially
+ * lets you configure X number of tasks to run on a single resource,
+ * ie amount equals 0.5 translates into 2 tasks per resource address.
*/
@Evolving
@Since("3.1.0")
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
index b4e70b3..1d5fc73 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -36,12 +36,19 @@ class TaskResourceRequests() extends Serializable {
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+ /**
+ * Returns all the resource requests for the task.
+ */
def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
+ /**
+ * (Java-specific) Returns all the resource requests for the task.
+ */
def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava
/**
* Specify number of cpus per Task.
+ * This is a convenient API to add [[TaskResourceRequest]] for cpus.
*
* @param amount Number of cpus to allocate per Task.
*/
@@ -52,7 +59,8 @@ class TaskResourceRequests() extends Serializable {
}
/**
- * Amount of a particular custom resource(GPU, FPGA, etc) to use.
+ * Amount of a particular custom resource(GPU, FPGA, etc) to use.
+ * This is a convenient API to add [[TaskResourceRequest]] for custom resources.
*
* @param resourceName Name of the resource.
* @param amount Amount requesting as a Double to support fractional resource requests.
@@ -66,6 +74,9 @@ class TaskResourceRequests() extends Serializable {
this
}
+ /**
+ * Add a certain [[TaskResourceRequest]] to the request set.
+ */
def addRequest(treq: TaskResourceRequest): this.type = {
_taskResources.put(treq.resourceName, treq)
this
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 44e8e57..66ece81 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -290,7 +290,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ereq.cores(4).memory("2g").memoryOverhead("1g").pysparkMemory("3g")
treq.cpus(2)
rpb.require(ereq).require(treq)
- val rp = rpb.build
+ val rp = rpb.build()
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
val executor = step.configurePod(SparkPod.initialPod())
@@ -307,7 +307,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ereq.cores(2).resource("gpu", 2, "/path/getGpusResources.sh", "nvidia.com")
treq.cpus(1)
rpb.require(ereq).require(treq)
- val rp = rpb.build
+ val rp = rpb.build()
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
val executor = step.configurePod(SparkPod.initialPod())
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index c7e2f1a..349bbcd 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -325,7 +325,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
ereq.cores(4).memory("2g")
treq.cpus(2)
rpb.require(ereq).require(treq)
- val rp = rpb.build
+ val rp = rpb.build()
// Target 1 executor for default profile, 2 for other profile,
// make sure it's requested, even with an empty initial snapshot.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org