You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/21 09:30:55 UTC

[spark] branch branch-3.1 updated: [SPARK-34384][CORE] Add missing docs for ResourceProfile APIs

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 02be408  [SPARK-34384][CORE] Add missing docs for ResourceProfile APIs
02be408 is described below

commit 02be4088b7b58a1fd270b212f3d97cea8561d173
Author: yi.wu <yi...@databricks.com>
AuthorDate: Sun Feb 21 18:29:44 2021 +0900

    [SPARK-34384][CORE] Add missing docs for ResourceProfile APIs
    
    ### What changes were proposed in this pull request?
    
    This PR adds missing docs for ResourceProfile related APIs. Besides, it includes a few minor changes on API:
    
    * ResourceProfileBuilder.build -> ResourceProfileBuilder.builder()
    * Provides java specific API `allSupportedExecutorResourcesJList`
    * private `ResourceAllocator` since it was mistakenly exposed previously
    
    ### Why are the changes needed?
    
    Add missing API docs
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, as Apache Spark 3.1 hasn't officially released.
    
    ### How was this patch tested?
    
    Updated unit tests due to the signature change of `build()`.
    
    Closes #31496 from Ngone51/resource-profile-api-cleanup.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 546d2eb5d46813a14c7bd30113fb6bb038cdd2fc)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/resource/ExecutorResourceRequest.scala   |  4 +--
 .../spark/resource/ExecutorResourceRequests.scala  | 12 +++++++
 .../apache/spark/resource/ResourceAllocator.scala  |  2 +-
 .../apache/spark/resource/ResourceProfile.scala    | 37 ++++++++++++++++++++--
 .../spark/resource/ResourceProfileBuilder.scala    | 20 +++++++++---
 .../spark/resource/TaskResourceRequest.scala       | 10 ++++--
 .../spark/resource/TaskResourceRequests.scala      | 13 +++++++-
 .../features/BasicExecutorFeatureStepSuite.scala   |  4 +--
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   |  2 +-
 9 files changed, 88 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
index 3e3db7e..76af41a 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
@@ -20,7 +20,7 @@ package org.apache.spark.resource
 import org.apache.spark.annotation.{Evolving, Since}
 
 /**
- * An Executor resource request. This is used in conjunction with the ResourceProfile to
+ * An Executor resource request. This is used in conjunction with the [[ResourceProfile]] to
  * programmatically specify the resources needed for an RDD that will be applied at the
  * stage level.
  *
@@ -39,7 +39,7 @@ import org.apache.spark.annotation.{Evolving, Since}
  *
  * See the configuration and cluster specific docs for more details.
  *
- * Use ExecutorResourceRequests class as a convenience API.
+ * Use [[ExecutorResourceRequests]] class as a convenience API.
  *
  * @param resourceName Name of the resource
  * @param amount Amount requesting
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index 654afa0..b6992f4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -37,12 +37,19 @@ class ExecutorResourceRequests() extends Serializable {
 
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
+  /**
+   * Returns all the resource requests for the task.
+   */
   def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
 
+  /**
+   * (Java-specific) Returns all the resource requests for the executor.
+   */
   def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava
 
   /**
    * Specify heap memory. The value specified will be converted to MiB.
+   * This is a convenient API to add [[ExecutorResourceRequest]] for "memory" resource.
    *
    * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
    *               Default unit is MiB if not specified.
@@ -57,6 +64,7 @@ class ExecutorResourceRequests() extends Serializable {
   /**
    * Specify off heap memory. The value specified will be converted to MiB.
    * This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
+   * This is a convenient API to add [[ExecutorResourceRequest]] for "offHeap" resource.
    *
    * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
    *               Default unit is MiB if not specified.
@@ -70,6 +78,7 @@ class ExecutorResourceRequests() extends Serializable {
 
   /**
    * Specify overhead memory. The value specified will be converted to MiB.
+   * This is a convenient API to add [[ExecutorResourceRequest]] for "memoryOverhead" resource.
    *
    * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
    *               Default unit is MiB if not specified.
@@ -83,6 +92,7 @@ class ExecutorResourceRequests() extends Serializable {
 
   /**
    * Specify pyspark memory. The value specified will be converted to MiB.
+   * This is a convenient API to add [[ExecutorResourceRequest]] for "pyspark.memory" resource.
    *
    * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
    *               Default unit is MiB if not specified.
@@ -96,6 +106,7 @@ class ExecutorResourceRequests() extends Serializable {
 
   /**
    * Specify number of cores per Executor.
+   * This is a convenient API to add [[ExecutorResourceRequest]] for "cores" resource.
    *
    * @param amount Number of cores to allocate per Executor.
    */
@@ -111,6 +122,7 @@ class ExecutorResourceRequests() extends Serializable {
    *  like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
    *  that the cluster manager doesn't support the result is undefined, it may error or may just
    *  be ignored.
+   *  This is a convenient API to add [[ExecutorResourceRequest]] for custom resources.
    *
    * @param resourceName Name of the resource.
    * @param amount amount of that resource per executor to use.
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index 22d10a9..7605e8c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkException
  * Trait used to help executor/worker allocate resources.
  * Please note that this is intended to be used in a single thread.
  */
-trait ResourceAllocator {
+private[spark] trait ResourceAllocator {
 
   protected def resourceName: String
   protected def resourceAddresses: Seq[String]
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index ac7e8e8..1ebd8bd 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -35,7 +35,13 @@ import org.apache.spark.util.Utils
  * Resource profile to associate with an RDD. A ResourceProfile allows the user to
  * specify executor and task requirements for an RDD that will get applied during a
  * stage. This allows the user to change the resource requirements between stages.
- * This is meant to be immutable so user can't change it after building.
+ * This is meant to be immutable so user can't change it after building. Users
+ * should use [[ResourceProfileBuilder]] to build it.
+ *
+ * @param executorResources Resource requests for executors. Mapped from the resource
+ *                          name (e.g., cores, memory, CPU) to its specific request.
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ *                      name (e.g., cores, memory, CPU) to its specific request.
  */
 @Evolving
 @Since("3.1.0")
@@ -53,6 +59,9 @@ class ResourceProfile(
   private var _maxTasksPerExecutor: Option[Int] = None
   private var _coresLimitKnown: Boolean = false
 
+  /**
+   * A unique id of this ResourceProfile
+   */
   def id: Int = _id
 
   /**
@@ -242,17 +251,39 @@ class ResourceProfile(
 
 object ResourceProfile extends Logging {
   // task resources
+  /**
+   * built-in task resource: cpus
+   */
   val CPUS = "cpus"
   // Executor resources
   // Make sure add new executor resource in below allSupportedExecutorResources
+  /**
+   * built-in executor resource: cores
+   */
   val CORES = "cores"
+  /**
+   * built-in executor resource: cores
+   */
   val MEMORY = "memory"
+  /**
+   * built-in executor resource: offHeap
+   */
   val OFFHEAP_MEM = "offHeap"
+  /**
+   * built-in executor resource: memoryOverhead
+   */
   val OVERHEAD_MEM = "memoryOverhead"
+  /**
+   * built-in executor resource: pyspark.memory
+   */
   val PYSPARK_MEM = "pyspark.memory"
 
-  // all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
-  val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
+  /**
+   * Return all supported Spark built-in executor resources, custom resources like GPUs/FPGAs
+   * are excluded.
+   */
+  def allSupportedExecutorResources: Array[String] =
+    Array(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
 
   val UNKNOWN_RESOURCE_PROFILE_ID = -1
   val DEFAULT_RESOURCE_PROFILE_ID = 0
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
index 29a117b..f6b30d3 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
@@ -26,9 +26,9 @@ import org.apache.spark.annotation.{Evolving, Since}
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an RDD.
- * A ResourceProfile allows the user to specify executor and task requirements for an RDD
- * that will get applied during a stage. This allows the user to change the resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource requirements
+ * for an RDD that will get applied during a stage. This allows the user to change the resource
  * requirements between stages.
  *
  */
@@ -36,7 +36,9 @@ import org.apache.spark.annotation.{Evolving, Since}
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests specified by users, mapped from resource name to the request.
   private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+  // Executor resource requests specified by users, mapped from resource name to the request.
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
   def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
@@ -54,11 +56,21 @@ class ResourceProfileBuilder() {
     _executorResources.asScala.asJava
   }
 
+  /**
+   * Add executor resource requests
+   * @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
+   * @return This ResourceProfileBuilder
+   */
   def require(requests: ExecutorResourceRequests): this.type = {
     _executorResources.putAll(requests.requests.asJava)
     this
   }
 
+  /**
+   * Add task resource requests
+   * @param requests The detailed task resource requests, see [[TaskResourceRequest]]
+   * @return This ResourceProfileBuilder
+   */
   def require(requests: TaskResourceRequests): this.type = {
     _taskResources.putAll(requests.requests.asJava)
     this
@@ -80,7 +92,7 @@ class ResourceProfileBuilder() {
       s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
   }
 
-  def build: ResourceProfile = {
+  def build(): ResourceProfile = {
     new ResourceProfile(executorResources, taskResources)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
index 12ef342..cbd5780 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
@@ -20,11 +20,17 @@ package org.apache.spark.resource
 import org.apache.spark.annotation.{Evolving, Since}
 
 /**
- * A task resource request. This is used in conjunction with the ResourceProfile to
+ * A task resource request. This is used in conjunction with the [[ResourceProfile]] to
  * programmatically specify the resources needed for an RDD that will be applied at the
  * stage level.
  *
- * Use TaskResourceRequests class as a convenience API.
+ * Use [[TaskResourceRequests]] class as a convenience API.
+ *
+ * @param resourceName Resource name
+ * @param amount Amount requesting as a Double to support fractional resource requests.
+ *               Valid values are less than or equal to 0.5 or whole numbers. This essentially
+ *               lets you configure X number of tasks to run on a single resource,
+ *               ie amount equals 0.5 translates into 2 tasks per resource address.
  */
 @Evolving
 @Since("3.1.0")
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
index b4e70b3..1d5fc73 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -36,12 +36,19 @@ class TaskResourceRequests() extends Serializable {
 
   private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
 
+  /**
+   * Returns all the resource requests for the task.
+   */
   def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
 
+  /**
+   * (Java-specific) Returns all the resource requests for the task.
+   */
   def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava
 
   /**
    * Specify number of cpus per Task.
+   * This is a convenient API to add [[TaskResourceRequest]] for cpus.
    *
    * @param amount Number of cpus to allocate per Task.
    */
@@ -52,7 +59,8 @@ class TaskResourceRequests() extends Serializable {
   }
 
   /**
-   *  Amount of a particular custom resource(GPU, FPGA, etc) to use.
+   * Amount of a particular custom resource(GPU, FPGA, etc) to use.
+   * This is a convenient API to add [[TaskResourceRequest]] for custom resources.
    *
    * @param resourceName Name of the resource.
    * @param amount Amount requesting as a Double to support fractional resource requests.
@@ -66,6 +74,9 @@ class TaskResourceRequests() extends Serializable {
     this
   }
 
+  /**
+   * Add a certain [[TaskResourceRequest]] to the request set.
+   */
   def addRequest(treq: TaskResourceRequest): this.type = {
     _taskResources.put(treq.resourceName, treq)
     this
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 44e8e57..66ece81 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -290,7 +290,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     ereq.cores(4).memory("2g").memoryOverhead("1g").pysparkMemory("3g")
     treq.cpus(2)
     rpb.require(ereq).require(treq)
-    val rp = rpb.build
+    val rp = rpb.build()
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
     val executor = step.configurePod(SparkPod.initialPod())
 
@@ -307,7 +307,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     ereq.cores(2).resource("gpu", 2, "/path/getGpusResources.sh", "nvidia.com")
     treq.cpus(1)
     rpb.require(ereq).require(treq)
-    val rp = rpb.build
+    val rp = rpb.build()
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
     val executor = step.configurePod(SparkPod.initialPod())
 
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index c7e2f1a..349bbcd 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -325,7 +325,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     ereq.cores(4).memory("2g")
     treq.cpus(2)
     rpb.require(ereq).require(treq)
-    val rp = rpb.build
+    val rp = rpb.build()
 
     // Target 1 executor for default profile, 2 for other profile,
     // make sure it's requested, even with an empty initial snapshot.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org