You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/18 01:31:20 UTC

[GitHub] [spark] holdenk commented on a change in pull request #31496: [SPARK-34384][CORE] API cleanup for ResourceProfile

holdenk commented on a change in pull request #31496:
URL: https://github.com/apache/spark/pull/31496#discussion_r578061391



##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
##########
@@ -17,71 +17,67 @@
 
 package org.apache.spark.resource
 
-import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Evolving, Since}
+import org.apache.spark.util.Utils
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an RDD.
- * A ResourceProfile allows the user to specify executor and task requirements for an RDD
- * that will get applied during a stage. This allows the user to change the resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource requirements
+ * for an RDD that will get applied during a stage. This allows the user to change the resource
  * requirements between stages.
  *
  */
 @Evolving
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests that specified by users, mapped from resource name to the request.
   private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+  // Executor resource requests that specified by users, mapped from resource name to the request.
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
-  def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
-
-  /**
-   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
-   */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava
-
   /**
-   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   * Add executor resource requests
+   * @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
+   * @return this.type
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
-    _executorResources.asScala.asJava
-  }
-
-  def require(requests: ExecutorResourceRequests): this.type = {
+  def executorRequire(requests: ExecutorResourceRequests): this.type = {
     _executorResources.putAll(requests.requests.asJava)
     this
   }
 
-  def require(requests: TaskResourceRequests): this.type = {
+  /**
+   * Add task resource requests
+   * @param requests The detailed task resource requests, see [[TaskResourceRequest]]
+   * @return this.type
+   */
+  def taskRequire(requests: TaskResourceRequests): this.type = {
     _taskResources.putAll(requests.requests.asJava)
     this
   }
 
-  def clearExecutorResourceRequests(): this.type = {
-    _executorResources.clear()
-    this
-  }
-
-  def clearTaskResourceRequests(): this.type = {
-    _taskResources.clear()
-    this
-  }
-
   override def toString(): String = {
     "Profile executor resources: " +
       s"${_executorResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}, " +
       s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
   }
 
-  def build: ResourceProfile = {
-    new ResourceProfile(executorResources, taskResources)
+  def build(): ResourceProfile = {
+    if (!Utils.isTesting) {
+      if (_taskResources.isEmpty) {
+        throw new SparkException("Empty task resource request.")
+      }
+      if (_executorResources.isEmpty) {
+        throw new SparkException("Empty executor resource request.")
+      }

Review comment:
       This seems like a new constraint, can you explain why?

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
##########
@@ -25,7 +25,7 @@ import org.apache.spark.SparkException
  * Trait used to help executor/worker allocate resources.
  * Please note that this is intended to be used in a single thread.
  */
-trait ResourceAllocator {
+private[spark] trait ResourceAllocator {

Review comment:
       Would this impact folks working on schedulers outside of org.apache.spark?

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
##########
@@ -17,71 +17,67 @@
 
 package org.apache.spark.resource
 
-import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Evolving, Since}
+import org.apache.spark.util.Utils
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an RDD.
- * A ResourceProfile allows the user to specify executor and task requirements for an RDD
- * that will get applied during a stage. This allows the user to change the resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource requirements
+ * for an RDD that will get applied during a stage. This allows the user to change the resource
  * requirements between stages.
  *
  */
 @Evolving
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests that specified by users, mapped from resource name to the request.
   private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+  // Executor resource requests that specified by users, mapped from resource name to the request.
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
-  def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
-
-  /**
-   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
-   */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava
-
   /**
-   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   * Add executor resource requests
+   * @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
+   * @return this.type
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
-    _executorResources.asScala.asJava
-  }
-
-  def require(requests: ExecutorResourceRequests): this.type = {
+  def executorRequire(requests: ExecutorResourceRequests): this.type = {
     _executorResources.putAll(requests.requests.asJava)
     this
   }
 
-  def require(requests: TaskResourceRequests): this.type = {
+  /**
+   * Add task resource requests
+   * @param requests The detailed task resource requests, see [[TaskResourceRequest]]
+   * @return this.type
+   */
+  def taskRequire(requests: TaskResourceRequests): this.type = {

Review comment:
       We might need to mention this in the migration guide then yeah? Personally I'd rather deprecate the old ones and point to the new than delete the API but I understand it's evolving.

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
##########
@@ -17,71 +17,67 @@
 
 package org.apache.spark.resource
 
-import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Evolving, Since}
+import org.apache.spark.util.Utils
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an RDD.
- * A ResourceProfile allows the user to specify executor and task requirements for an RDD
- * that will get applied during a stage. This allows the user to change the resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource requirements
+ * for an RDD that will get applied during a stage. This allows the user to change the resource
  * requirements between stages.
  *
  */
 @Evolving
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests that specified by users, mapped from resource name to the request.
   private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+  // Executor resource requests that specified by users, mapped from resource name to the request.
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
-  def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
-
-  /**
-   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
-   */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava
-
   /**
-   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   * Add executor resource requests
+   * @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
+   * @return this.type
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
-    _executorResources.asScala.asJava
-  }
-
-  def require(requests: ExecutorResourceRequests): this.type = {
+  def executorRequire(requests: ExecutorResourceRequests): this.type = {
     _executorResources.putAll(requests.requests.asJava)
     this
   }
 
-  def require(requests: TaskResourceRequests): this.type = {
+  /**
+   * Add task resource requests
+   * @param requests The detailed task resource requests, see [[TaskResourceRequest]]
+   * @return this.type
+   */
+  def taskRequire(requests: TaskResourceRequests): this.type = {
     _taskResources.putAll(requests.requests.asJava)
     this
   }
 
-  def clearExecutorResourceRequests(): this.type = {
-    _executorResources.clear()
-    this
-  }
-
-  def clearTaskResourceRequests(): this.type = {
-    _taskResources.clear()
-    this
-  }

Review comment:
       Clear could be useful in a builder no?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org