You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/03 14:30:16 UTC

[GitHub] [spark] tgravescs opened a new pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

tgravescs opened a new pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773
 
 
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This is the core scheduler changes to support Stage level scheduling. 
   
   The main changes here include modification to the DAGScheduler to look at the ResourceProfiles associated with an RDD and have those applied inside the scheduler.
   Currently if multiple RDD's in a stage have conflicting ResourceProfiles we throw an error. logic to allow this will happen in SPARK-29153. I added the interfaces to RDD to add and get the REsourceProfile so that I could add unit tests for the scheduler. These are marked as private for now until we finish the feature and will be exposed in SPARK-29150. If you think this is confusing I can remove those and remove the tests and add them back later.
   I modified the task scheduler to make sure to only schedule on executor that exactly match the resource profile. It will then check those executors to make sure the current resources meet the task needs before assigning it.  In here I changed the way we do the custom resource assignment. 
   Other changes here include having the cpus per task passed around so that we can properly account for them. Previously we just used the one global config, but now it can change based on the ResourceProfile.
   I removed the exceptions that require the cores to be the limiting resource. With this change all the places I found that used executor cores /task cpus as slots has been updated to use the ResourceProfile logic and look to see what resource is limiting.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Stage level sheduling feature
   
   ### Does this PR introduce any user-facing change?
   <!--
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   
   unit tests and lots of manual testing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378565
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24787/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r395110450
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -433,14 +435,32 @@ private[spark] class DAGScheduler(
    * the check fails consecutively beyond a configured number for a job, then fail current job
    * submission.
    */
-  private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
+  private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = {
     val numPartitions = rdd.getNumPartitions
-    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
+    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
     if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
       throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
     }
   }
 
+  private[scheduler] def mergeResourceProfilesForStage(
+      stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = {
+    logDebug("rdd profiles: " + stageResourceProfiles)
 
 Review comment:
   are you just referring to the log message? I'll update the comment and use string interoperation here as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232307
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601392084
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24788/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389535224
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
 
 Review comment:
   Thoughts on always keeping cpu resource distinct from other resources in resource profile ? Given number of times this will need to be run, might be worth avoiding ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r395105623
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
 ##########
 @@ -836,10 +840,10 @@ class TaskSetManagerSuite
     }
     // Offer resources for 4 tasks to start
     for ((exec, host) <- Seq(
-      "exec1" -> "host1",
-      "exec1" -> "host1",
-      "exec3" -> "host3",
-      "exec2" -> "host2")) {
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec3" -> "host3",
+        "exec2" -> "host2")) {
 
 Review comment:
   weird, IntelliJ did them differently. I'll make consistent

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389902603
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(sc.getConf)
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
 
 Review comment:
   so I actually already have a REsourceProfile.getTaskCpus, but it returns option because it doesn't have the conf passed in to the default task cpus. I was trying to keep from having to pass the conf in as much as possible. But I ended up having to do that in other functions, so let me take another look to see if I can make that cleaner

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597228281
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24358/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390073662
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
 
 Review comment:
   > unless it got checked in without me seeing it?
   
   Barrier mode doesn't support dynamic allocation yet.
   
   
   But sorry I don't quite understand your concern here.
   
   With my understanding, `calculateAvailableSlots` only gives a chance to barrier stage to try to launch all tasks at the same time. But it never guarantee that barrier stage can be successfully launched even if there's enough slots(e.g. due to delay scheduling). So, I mean, whether we calculate total or partial sum of resource slots, it should makes no difference for barrier stage scheduling.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232487
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597329062
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24362/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375409
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120069/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597305667
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378556
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390332370
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -140,12 +141,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def receive: PartialFunction[Any, Unit] = {
-      case StatusUpdate(executorId, taskId, state, data, resources) =>
+      case StatusUpdate(executorId, taskId, state, data, taskCpus, resources) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
           executorDataMap.get(executorId) match {
             case Some(executorInfo) =>
-              executorInfo.freeCores += scheduler.CPUS_PER_TASK
+              executorInfo.freeCores += taskCpus
 
 Review comment:
   ah, yes we can get it from there. we aren't using it for anything else at this point. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597228265
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232315
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24359/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597328534
 
 
   **[Test build #119632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119632/testReport)** for PR 27773 at commit [`21bb8a8`](https://github.com/apache/spark/commit/21bb8a8a9856993e816b6d38c885dbcc77afa5fa).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389553206
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 ##########
 @@ -162,7 +162,13 @@ private[spark] class LocalSchedulerBackend(
 
   override def applicationId(): String = appId
 
-  override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK
+  // Doesn't support different ResourceProfiles yet
+  // so we expect all executors to be of same ResourceProfile
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+    val cpusPerTask = rp.taskResources.get(ResourceProfile.CPUS)
+      .map(_.amount.toInt).getOrElse(scheduler.CPUS_PER_TASK)
+    totalCores / cpusPerTask
 
 Review comment:
   Curious, in future are we planning to support resource profiles for local mode ? With local gpu for example ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r395977003
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
 
 Review comment:
   That would be an api change for resource profile ?
   Do we want to do it before this gets released ? (in a subsequent pr is fine)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-604416980
 
 
   LGTM.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-604473053
 
 
   thanks @mridulm @Ngone51 merged to master branch

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389523055
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -231,11 +232,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val resourcesInfo = resources.map { case (rName, info) =>
-            // tell the executor it can schedule resources up to numParts times,
+            // tell the executor it can schedule resources up to numSlotsPerAddress times,
             // as configured by the user, or set to 1 as that is the default (1 task/resource)
             val numParts = scheduler.sc.resourceProfileManager
               .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
-            (info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))
+            (info.name,
+             new ExecutorResourceInfo(info.name, info.addresses, numParts))
 
 Review comment:
   nit: this can still fit into one line and make no change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-594169369
 
 
   @mridulm @squito  if either of you have time to review

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-594088765
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119233/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597329049
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597382267
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378565
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24787/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389778059
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
 ##########
 @@ -64,8 +64,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
     isSupported(rp)
     // force the computation of maxTasks and limitingResource now so we don't have cost later
     rp.limitingResource(sparkConf)
-    logInfo(s"Adding ResourceProfile id: ${rp.id}")
-    resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    if (res == null) {
+      logInfo(s"Added ResourceProfile id: ${rp.id}")
 
 Review comment:
   currently I'm not removing the old resource profiles as they could be used elsewhere and its a bit hard to know when you are done using them. I don't expect people to use many of them (but as we know lots of people use things unexpectedly). But I have a follow up jira to go figure out a way to remove unused ones. https://issues.apache.org/jira/browse/SPARK-30749

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232307
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-598592333
 
 
   cc @jiangxb1987 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389500344
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -392,11 +392,13 @@ private[spark] object ResourceUtils extends Logging {
       s"${resourceRequest.id.resourceName}")
   }
 
-  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+  def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int, taskCpus: Int): Boolean = {
     // Number of cores per executor must meet at least one task requirement.
-    if (execCores < taskCpus) {
-      throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
-        s"the number of cpus per task = $taskCpus.")
+    if (!sparkConf.get(TASKSET_MANAGER_SPECULATION_TESTING)) {
+      if (execCores < taskCpus) {
 
 Review comment:
   nit: 
   ```suggestion
       if (!sparkConf.get(TASKSET_MANAGER_SPECULATION_TESTING) && execCores < taskCpus)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597231596
 
 
   **[Test build #119629 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119629/testReport)** for PR 27773 at commit [`926b0e9`](https://github.com/apache/spark/commit/926b0e998c02626508a25de6a805a2bff3d113b6).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389967592
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/internal/config/Tests.scala
 ##########
 @@ -62,4 +62,9 @@ private[spark] object Tests {
       .booleanConf
       .createWithDefault(false)
 
+  val TASKSET_MANAGER_SPECULATION_TESTING =
+    ConfigBuilder("spark.testing.taskSetManagerSpeculation")
 
 Review comment:
   I'll document when I change with the comment below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601374717
 
 
   **[Test build #120069 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120069/testReport)** for PR 27773 at commit [`545f5b2`](https://github.com/apache/spark/commit/545f5b205aa6ff3e822524e9b44b082b2bc7b762).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389788613
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -606,10 +608,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = rp.getTaskCpus.getOrElse(scheduler.CPUS_PER_TASK)
+    val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
 
 Review comment:
   I assume that could be the case or that executor number went to 0 if some died, the sum will just return 0. there is no change in logic for that.  Is there something specific you are wondering about?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597228265
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375238
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-596804566
 
 
   thanks @mridulm @Ngone51 for the reviews, hope to have revisions up tomorrow as I need to followup on a few of your suggestions.  I think I had a few questions in response to yours so just let me know your thoughts on those.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390064076
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -606,10 +608,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = rp.getTaskCpus.getOrElse(scheduler.CPUS_PER_TASK)
+    val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
 
 Review comment:
   Let's say user has a barrier job like:
   
   ```
   rdd.withResources(rp).barrirer().mapPartition { part =>
     // do some barrier stuff
   }.collect()
   ```
   So, at the time we're `createShuffleMapStage`/`mergeResourceProfilesForStage`/`checkBarrierStageWithNumSlots`, it's possible that some executors haven't launched?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-602019491
 
 
   The changes look good to me, will wait for @Ngone51 also to finish review.
   Thanks for working on this Tom !

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390304786
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
 
 Review comment:
   Barrier just needs to make sure there are X slots available, if there are not it skips that round of scheduling. 
   What I'm saying is this is how the code did it before (calculate all slots) and in the scenario people are using barrier scheduling, they can't use dynamic allocation, so I would  expect the user to request X executors up front so it is very unlikely the job would have X + Y executors, so even though calculateAvailableSlots would add up all slots, I wouldn't expect that to be more then X. Meaning calculateAvailableSlots isn't wasting any time adding in extra slots because they wouldn't be there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389539950
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(sc.getConf)
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
 
 Review comment:
   nit: Should we move `taskCpusForProfileId` into resource profile (and call it `taskCpus` or some such) ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601444176
 
 
   **[Test build #120071 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120071/testReport)** for PR 27773 at commit [`2a4a733`](https://github.com/apache/spark/commit/2a4a73385ee948d81e51091990a37ebbc8010f61).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390364234
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -529,6 +549,28 @@ private[spark] class DAGScheduler(
     parents
   }
 
+  private[scheduler] def getResourceProfilesForRDDsInStage(
 
 Review comment:
   Oh, I misunderstand one thing here after a second think. Never mind.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597382274
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119632/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-594087462
 
 
   **[Test build #119233 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119233/testReport)** for PR 27773 at commit [`71dd91d`](https://github.com/apache/spark/commit/71dd91dd3119a0daab368af2e2443a3fca1a460d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389536086
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
 
 Review comment:
   Swap both lines and return empty map if `tsResources.isEmpty`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378692
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389523380
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -96,6 +96,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   protected val executorsPendingDecommission = new HashSet[String]
 
   // A map of ResourceProfile id to map of hostname with its possible task number running on it
+  // A map to store hostname with its possible task number running on it
 
 Review comment:
   nit: redundant?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597329049
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393421362
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
 ##########
 @@ -184,7 +184,7 @@ private[spark] object TaskDescription {
     // Create a sub-buffer for the serialized task into its own buffer (to be deserialized later).
     val serializedTask = byteBuffer.slice()
 
-    new TaskDescription(taskId, attemptNumber, executorId, name, index, partitionId, taskFiles,
-      taskJars, properties, resources, serializedTask)
+    new TaskDescription(taskId, attemptNumber, executorId, name, index, partitionId,
+      taskFiles, taskJars, properties, resources, serializedTask)
 
 Review comment:
   Could you please revert here as it makes no change?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389782696
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
 ##########
 @@ -64,8 +64,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
     isSupported(rp)
     // force the computation of maxTasks and limitingResource now so we don't have cost later
     rp.limitingResource(sparkConf)
-    logInfo(s"Adding ResourceProfile id: ${rp.id}")
-    resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    if (res == null) {
+      logInfo(s"Added ResourceProfile id: ${rp.id}")
 
 Review comment:
   also note that adding a resourceprofile multiple times (rdd.withResources) doesn't leave around old resource profiles, its when they actually create the resourceProfile that it gets a separate id.
   This pr has the RDD.withResources in now for testing purposes, so we can leave this conversation til the Pr that opens it up but I was trying to decide whether we only allow setting it once per RDD , left it out here as I was still thinking about that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597228281
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24358/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390065713
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -140,12 +141,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def receive: PartialFunction[Any, Unit] = {
-      case StatusUpdate(executorId, taskId, state, data, resources) =>
+      case StatusUpdate(executorId, taskId, state, data, taskCpus, resources) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
           executorDataMap.get(executorId) match {
             case Some(executorInfo) =>
-              executorInfo.freeCores += scheduler.CPUS_PER_TASK
+              executorInfo.freeCores += taskCpus
 
 Review comment:
   Oh..oops..I copied example from your code without update and leave you to another direction. Sorry about that.
   
   Actually, I mean, could we do this?
   
   `sc.resourceProfileManager.taskCpusForProfileId(executorInfo.resourceProfileId)`
   
   Get resourceProfileId from executorInfo instead of taskSet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601444851
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120071/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389789202
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 ##########
 @@ -457,18 +466,8 @@ private[spark] class TaskSetManager(
         // val timeTaken = clock.getTime() - startTime
         val taskName = s"task ${info.id} in stage ${taskSet.id}"
         logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
-          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
-
-        val extraResources = sched.resourcesReqsPerTask.map { taskReq =>
-          val rName = taskReq.resourceName
-          val count = taskReq.amount
-          val rAddresses = availableResources.getOrElse(rName, Seq.empty)
-          assert(rAddresses.size >= count, s"Required $count $rName addresses, but only " +
-            s"${rAddresses.size} available.")
-          // We'll drop the allocated addresses later inside TaskSchedulerImpl.
-          val allocatedAddresses = rAddresses.take(count)
-          (rName, new ResourceInformation(rName, allocatedAddresses.toArray))
-        }.toMap
+          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes) " +
+          s"cpus: $taskCpus, taskResourceAssignments ${taskResourceAssignments}")
 
 Review comment:
   nope, I'll remove

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389543998
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(sc.getConf)
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+
+    offersForResourceProfile.map { case (o, index) =>
+      val numTasksPerExecCores = availableCpus(index) / taskCpus
+      // when executor cores config isn't set, we can't calculate the real limiting resource
+      // and number of tasks per executor ahead of time, so calculate it now.
+      if (!coresKnown) {
+        val numTasksPerExecCustomResource = resourceProfile.maxTasksPerExecutor(sc.getConf)
+        if (limitingResource.isEmpty ||
+          (limitingResource.nonEmpty && numTasksPerExecCores < numTasksPerExecCustomResource)) {
+          limitingResource = ResourceProfile.CPUS
 
 Review comment:
   I am a bit unclear on this.
   Even if number of tasks based on a custom resource is > number of tasks based on available cpu cores, the actual number of tasks which can be assigned based on that custom resource could be < number of tasks by core (3 of the 4 gpu's already in use for example while available cores == 2).
   Here, we would incorrectly assume the executor can schedule upto numTasksPerExecCores tasks while there might not be enough available custom resource right ? (the else block below does the right thing - I am not sure of this short circuiting being done here).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-594088755
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601391433
 
 
   **[Test build #120071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120071/testReport)** for PR 27773 at commit [`2a4a733`](https://github.com/apache/spark/commit/2a4a73385ee948d81e51091990a37ebbc8010f61).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378704
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120070/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389535829
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
 ##########
 @@ -76,6 +76,21 @@ class ResourceProfile(
     taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
   }
 
+  /*
+   * This function takes into account fractional amounts for the task resource requirement.
+   * Spark only supports fractional amounts < 1 to basically allow for multiple tasks
+   * to use the same resource.
 
 Review comment:
   IIUC, "same resource **address**" is better?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232487
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389861623
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -340,39 +338,48 @@ private[spark] class TaskSchedulerImpl(
     for (i <- 0 until shuffledOffers.size) {
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
-      if (availableCpus(i) >= CPUS_PER_TASK &&
-        resourcesMeetTaskRequirements(availableResources(i))) {
-        try {
-          for (task <- taskSet.resourceOffer(execId, host, maxLocality, availableResources(i))) {
-            tasks(i) += task
-            val tid = task.taskId
-            taskIdToTaskSetManager.put(tid, taskSet)
-            taskIdToExecutorId(tid) = execId
-            executorIdToRunningTaskIds(execId).add(tid)
-            availableCpus(i) -= CPUS_PER_TASK
-            assert(availableCpus(i) >= 0)
-            task.resources.foreach { case (rName, rInfo) =>
-              // Remove the first n elements from availableResources addresses, these removed
-              // addresses are the same as that we allocated in taskSet.resourceOffer() since it's
-              // synchronized. We don't remove the exact addresses allocated because the current
-              // approach produces the identical result with less time complexity.
-              availableResources(i).getOrElse(rName,
-                throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
-                .remove(0, rInfo.addresses.size)
-            }
-            // Only update hosts for a barrier task.
-            if (taskSet.isBarrier) {
-              // The executor address is expected to be non empty.
-              addressesWithDescs += (shuffledOffers(i).address.get -> task)
+      val taskSetRpID = taskSet.taskSet.resourceProfileId
+      // make the resource profile id a hard requirement for now - ie only put tasksets
+      // on executors where resource profile exactly matches.
+      if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+        val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, availableCpus(i),
+          availableResources(i))
+        taskResAssignmentsOpt.foreach { taskResAssignments =>
+          try {
+            val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(taskSetRpID)
+            val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, taskCpus,
+              taskResAssignments)
+            for (task <- taskDescOption) {
+              tasks(i) += task
+              val tid = task.taskId
+              taskIdToTaskSetManager.put(tid, taskSet)
+              taskIdToExecutorId(tid) = execId
+              executorIdToRunningTaskIds(execId).add(tid)
+              availableCpus(i) -= task.cpus
+              assert(availableCpus(i) >= 0)
 
 Review comment:
   this was always the case - even when cpu only, we left it when adding the original resource scheduling code as it doesn't hurt and just double check if something really unexpected happens

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375253
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24786/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389795523
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -429,10 +509,13 @@ private[spark] class TaskSchedulerImpl(
 
     val shuffledOffers = shuffleOffers(filteredOffers)
     // Build a list of tasks to assign to each worker.
+    // Note the size estimate here might be off with different ResourceProfiles but should be
+    // close estimate
     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
     val availableResources = shuffledOffers.map(_.resources).toArray
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
-    val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
+    val resourceProfileIds = shuffledOffers.map(o => o.resourceProfileId).toArray
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue
 
 Review comment:
   merge conflict that I didn't catch, thanks for pointing this out, I'll ad it back.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390300887
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 ##########
 @@ -162,7 +162,13 @@ private[spark] class LocalSchedulerBackend(
 
   override def applicationId(): String = appId
 
-  override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK
+  // Doesn't support different ResourceProfiles yet
+  // so we expect all executors to be of same ResourceProfile
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+    val cpusPerTask = rp.taskResources.get(ResourceProfile.CPUS)
+      .map(_.amount.toInt).getOrElse(scheduler.CPUS_PER_TASK)
+    totalCores / cpusPerTask
 
 Review comment:
   yeah the main issue right now is this is tied to dynamic allocation, so we would just need to figure out we would want that to behave.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375401
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389506902
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -606,10 +608,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = rp.getTaskCpus.getOrElse(scheduler.CPUS_PER_TASK)
+    val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
 
 Review comment:
   Executors may haven't launched at the time we call `maxNumConcurrentTasks` from `checkBarrierStageWithNumSlots`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389551461
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -429,10 +509,13 @@ private[spark] class TaskSchedulerImpl(
 
     val shuffledOffers = shuffleOffers(filteredOffers)
     // Build a list of tasks to assign to each worker.
+    // Note the size estimate here might be off with different ResourceProfiles but should be
+    // close estimate
     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
     val availableResources = shuffledOffers.map(_.resources).toArray
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
-    val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
+    val resourceProfileIds = shuffledOffers.map(o => o.resourceProfileId).toArray
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue
 
 Review comment:
   Why was `isZombie` check removed ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389973937
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
 
 Review comment:
   we could. the code would become a  loop (adding up as you went) and not like a map.sum().  honestly I just did what was there before and was thinking that barrier doesn't support dynamic allocation at this point (unless it got checked in without me seeing it?) so the number of executors is more likely to be less then what you need, not more.  
   Do you see other cases?
   I guess  when barrier scheduling starts to support dynamic allocation this could be more of an issue if you run a large ETL stage and then to smaller number with ML. 
   I'm fine either way, if you think its an issue now I will change or we could wait til dynamic allocation is supported, what are you thoughts?  (note that I am changing that logic as it had a bug that Mridul found in his review) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390235561
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 ##########
 @@ -162,7 +162,13 @@ private[spark] class LocalSchedulerBackend(
 
   override def applicationId(): String = appId
 
-  override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK
+  // Doesn't support different ResourceProfiles yet
+  // so we expect all executors to be of same ResourceProfile
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+    val cpusPerTask = rp.taskResources.get(ResourceProfile.CPUS)
+      .map(_.amount.toInt).getOrElse(scheduler.CPUS_PER_TASK)
+    totalCores / cpusPerTask
 
 Review comment:
   If possible, feature parity in whatever practical/constrained way of local mode with cluster mode would be nice - it allows for an quick prototyping/testing before launching on a cluster.
   Ofcourse, not coupling with this work - but particularly for rapid prototyping, local mode is invaluable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-594088765
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119233/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390306835
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -529,6 +549,28 @@ private[spark] class DAGScheduler(
     parents
   }
 
+  private[scheduler] def getResourceProfilesForRDDsInStage(
 
 Review comment:
   I'm not sure what you mean by this, ResourceProfiles don't track RDDs, RDDs track what ResourceProfile they have. The best you could do is check to see if there is more then 1 Resource Profile (the default one).  The only way to know if an RDD has the profile is to look at them.  I'm looking to see if I can combine, it doesn't look as messy now, but let me finish codig it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601392084
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24788/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389860299
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -96,6 +96,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   protected val executorsPendingDecommission = new HashSet[String]
 
   // A map of ResourceProfile id to map of hostname with its possible task number running on it
+  // A map to store hostname with its possible task number running on it
 
 Review comment:
   yes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389520542
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/internal/config/Tests.scala
 ##########
 @@ -62,4 +62,9 @@ private[spark] object Tests {
       .booleanConf
       .createWithDefault(false)
 
+  val TASKSET_MANAGER_SPECULATION_TESTING =
+    ConfigBuilder("spark.testing.taskSetManagerSpeculation")
 
 Review comment:
   Can we add some comment/details on what the config is for ?
   I am unclear on the utility of this ...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389547146
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
 
 Review comment:
   Maybe we can pass in the target slots and stop calculation once we reach the target. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601391433
 
 
   **[Test build #120071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120071/testReport)** for PR 27773 at commit [`2a4a733`](https://github.com/apache/spark/commit/2a4a73385ee948d81e51091990a37ebbc8010f61).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601444848
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232498
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119629/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378556
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389802782
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
 
 Review comment:
   so I had thought about that and think I even started to code it up, but then it made other things more difficult.  Let me revisit it since that was quite a while ago and things may have changed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393425213
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -606,10 +608,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = rp.getTaskCpus.getOrElse(scheduler.CPUS_PER_TASK)
+    val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
 
 Review comment:
   Actually, I was not talking about some error cases here. Here, I'm worrying about the normal procedure. And I realize now we already have precaution for it, see: https://github.com/apache/spark/blob/b29829e2abdebdf6fa9dd0a4a4cf4c9d676ee82d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L983-L1004

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389499819
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
 ##########
 @@ -64,8 +64,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
     isSupported(rp)
     // force the computation of maxTasks and limitingResource now so we don't have cost later
     rp.limitingResource(sparkConf)
-    logInfo(s"Adding ResourceProfile id: ${rp.id}")
-    resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    if (res == null) {
+      logInfo(s"Added ResourceProfile id: ${rp.id}")
 
 Review comment:
   Is there'a way to clean old resource profile in case of a RDD calls `withResources` for multiple times?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-593980120
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/23972/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-594088755
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393420292
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -466,8 +488,9 @@ private[spark] class DAGScheduler(
    * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
 
 Review comment:
   The comment is out of date.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389536968
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
 
 Review comment:
   `availWorkerResources.get(rName)` -> `workerRes` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375409
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120069/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389775909
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##########
 @@ -1617,13 +1617,17 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the ResourceProfile
+   * being used.
    * Note that please don't cache the value returned by this method, because the number can change
    * due to add/remove executors.
    *
+   * @param rp ResourceProfile which to use to calculate max concurrent tasks.
    * @return The max number of tasks that can be concurrent launched currently.
    */
-  private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks()
+  private[spark] def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
 
 Review comment:
   its a private function  and I think caller should be aware of it, or at least thought about it making sure they aren't using another profile. The caller can simply get the default profile to pass in.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-593980108
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390233895
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -606,10 +608,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = rp.getTaskCpus.getOrElse(scheduler.CPUS_PER_TASK)
+    val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
 
 Review comment:
   Yes, executors need not have launched, launched/running executors might have failed (after computing `rdd` in the example above), or number of executors is < number of partitions in rdd.
   `maxNumConcurrentTasks` will tell us how many (max) can be run based on current state.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597329062
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24362/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375401
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-593980120
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/23972/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601444851
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120071/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232498
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119629/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389879223
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -392,11 +392,13 @@ private[spark] object ResourceUtils extends Logging {
       s"${resourceRequest.id.resourceName}")
   }
 
-  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+  def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int, taskCpus: Int): Boolean = {
     // Number of cores per executor must meet at least one task requirement.
-    if (execCores < taskCpus) {
-      throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
-        s"the number of cpus per task = $taskCpus.")
+    if (!sparkConf.get(TASKSET_MANAGER_SPECULATION_TESTING)) {
 
 Review comment:
   so this conf if for testing speculation to specifically test the case when we don't know what the executor cores setting is - in like standalone mode:     
   
    skip throwing exception when cores per task > cores per executor to emulate standalone mode
   
   I did this to emulate the standalone mode behavior even though running in local mode since we don't really have tests for full standalone mode. In standalone mode this function wouldn't even be called because we skip it. 
   
   Let me see if there is a cleaner way to test this as to not cause confusion.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389989873
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -529,6 +549,28 @@ private[spark] class DAGScheduler(
     parents
   }
 
+  private[scheduler] def getResourceProfilesForRDDsInStage(
 
 Review comment:
   I'll take another look. Originally when I wrote it, thought it would be more readable this way without much overhead, but let me take another look.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389531492
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -340,39 +338,48 @@ private[spark] class TaskSchedulerImpl(
     for (i <- 0 until shuffledOffers.size) {
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
-      if (availableCpus(i) >= CPUS_PER_TASK &&
-        resourcesMeetTaskRequirements(availableResources(i))) {
-        try {
-          for (task <- taskSet.resourceOffer(execId, host, maxLocality, availableResources(i))) {
-            tasks(i) += task
-            val tid = task.taskId
-            taskIdToTaskSetManager.put(tid, taskSet)
-            taskIdToExecutorId(tid) = execId
-            executorIdToRunningTaskIds(execId).add(tid)
-            availableCpus(i) -= CPUS_PER_TASK
-            assert(availableCpus(i) >= 0)
-            task.resources.foreach { case (rName, rInfo) =>
-              // Remove the first n elements from availableResources addresses, these removed
-              // addresses are the same as that we allocated in taskSet.resourceOffer() since it's
-              // synchronized. We don't remove the exact addresses allocated because the current
-              // approach produces the identical result with less time complexity.
-              availableResources(i).getOrElse(rName,
-                throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
-                .remove(0, rInfo.addresses.size)
-            }
-            // Only update hosts for a barrier task.
-            if (taskSet.isBarrier) {
-              // The executor address is expected to be non empty.
-              addressesWithDescs += (shuffledOffers(i).address.get -> task)
+      val taskSetRpID = taskSet.taskSet.resourceProfileId
+      // make the resource profile id a hard requirement for now - ie only put tasksets
+      // on executors where resource profile exactly matches.
+      if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+        val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, availableCpus(i),
+          availableResources(i))
+        taskResAssignmentsOpt.foreach { taskResAssignments =>
+          try {
+            val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(taskSetRpID)
+            val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, taskCpus,
+              taskResAssignments)
+            for (task <- taskDescOption) {
+              tasks(i) += task
+              val tid = task.taskId
+              taskIdToTaskSetManager.put(tid, taskSet)
+              taskIdToExecutorId(tid) = execId
+              executorIdToRunningTaskIds(execId).add(tid)
+              availableCpus(i) -= task.cpus
+              assert(availableCpus(i) >= 0)
 
 Review comment:
   Seem this is not necessary anymore since we've checked in `resourcesMeetTaskRequirements`. But I'm OK to keep it, though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390394335
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(sc.getConf)
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
 
 Review comment:
   ok, I moved this into the ResourceProfile object as a utility function and renamed to be getTaskCpusOrDefaultForProfileId

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-602635073
 
 
   thanks @mridulm 
   
   @Ngone51 let me know if you have any other comments

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597305671
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119628/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601374717
 
 
   **[Test build #120069 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120069/testReport)** for PR 27773 at commit [`545f5b2`](https://github.com/apache/spark/commit/545f5b205aa6ff3e822524e9b44b082b2bc7b762).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597231596
 
 
   **[Test build #119629 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119629/testReport)** for PR 27773 at commit [`926b0e9`](https://github.com/apache/spark/commit/926b0e998c02626508a25de6a805a2bff3d113b6).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375238
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597304721
 
 
   **[Test build #119628 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119628/testReport)** for PR 27773 at commit [`24cbf01`](https://github.com/apache/spark/commit/24cbf016e70ab5e85330cf3a44695d654237b134).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389968864
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -392,11 +392,13 @@ private[spark] object ResourceUtils extends Logging {
       s"${resourceRequest.id.resourceName}")
   }
 
-  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+  def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int, taskCpus: Int): Boolean = {
     // Number of cores per executor must meet at least one task requirement.
-    if (execCores < taskCpus) {
-      throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
-        s"the number of cpus per task = $taskCpus.")
+    if (!sparkConf.get(TASKSET_MANAGER_SPECULATION_TESTING)) {
 
 Review comment:
   Found a bit cleaner way that isn't called as much and localized to running in local mode. Added comments to the config so hopefully its clearer.  We only want to skip for unit testing purposes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597382267
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390723152
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
 
 Review comment:
   Ok, I see.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389497579
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
 ##########
 @@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag](
   @Since("2.4.0")
   def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
 
+  /**
+   * Specify a ResourceProfile to use when calculating this RDD. This is only supported on
+   * certain cluster managers and currently requires dynamic allocation to be enabled.
+   * It will result in new executors with the resources specified being acquired to
+   * calculate the RDD.
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
+  @Experimental
+  @Since("3.0.0")
+  private[spark] def withResources(rp: ResourceProfile): this.type = {
+    resourceProfile = Some(rp)
+    sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
+    this
+  }
+
+  /**
+   * Get the ResourceProfile specified with this RDD or None if it wasn't specified.
 
 Review comment:
   nit: or None -> or null

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389794784
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 ##########
 @@ -162,7 +162,13 @@ private[spark] class LocalSchedulerBackend(
 
   override def applicationId(): String = appId
 
-  override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK
+  // Doesn't support different ResourceProfiles yet
+  // so we expect all executors to be of same ResourceProfile
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+    val cpusPerTask = rp.taskResources.get(ResourceProfile.CPUS)
+      .map(_.amount.toInt).getOrElse(scheduler.CPUS_PER_TASK)
+    totalCores / cpusPerTask
 
 Review comment:
   not currently planned but someone did just say they were going to add the generic resource scheduling work in local mode.  I think for testing purposes so they can keep production code same. did you have specific use case or concern? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393428181
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
 ##########
 @@ -836,10 +840,10 @@ class TaskSetManagerSuite
     }
     // Offer resources for 4 tasks to start
     for ((exec, host) <- Seq(
-      "exec1" -> "host1",
-      "exec1" -> "host1",
-      "exec3" -> "host3",
-      "exec2" -> "host2")) {
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec3" -> "host3",
+        "exec2" -> "host2")) {
 
 Review comment:
   Format is not consistent compares to L1487-1490

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393430607
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
 ##########
 @@ -3094,6 +3100,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("test default resource profile") {
+    val rdd = sc.parallelize(1 to 10).map(x => (x, x))
+    val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val rp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    assert(rp.id == scheduler.sc.resourceProfileManager.defaultResourceProfile.id)
+  }
+
+  test("test 1 resource profile") {
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val rdd = sc.parallelize(1 to 10).map(x => (x, x)).withResources(rp1)
+    val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val rpMerged = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    val expectedid = Option(rdd.getResourceProfile).map(_.id)
+    assert(expectedid.isDefined)
+    assert(expectedid.get != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    assert(rpMerged.id == expectedid.get)
+  }
+
+  test("test 2 resource profiles errors by default") {
+    import org.apache.spark.resource._
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(2)
+    val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+    val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
+    val error = intercept[IllegalArgumentException] {
+      val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+      scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    }.getMessage()
+
+    assert(error.contains("Multiple ResourceProfile's specified in the RDDs"))
+  }
+
 
 Review comment:
   Shall we add a test for shuffle stages together with resource profiles? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597305671
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119628/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378692
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597305667
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389933552
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -140,12 +141,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def receive: PartialFunction[Any, Unit] = {
-      case StatusUpdate(executorId, taskId, state, data, resources) =>
+      case StatusUpdate(executorId, taskId, state, data, taskCpus, resources) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
           executorDataMap.get(executorId) match {
             case Some(executorInfo) =>
-              executorInfo.freeCores += scheduler.CPUS_PER_TASK
+              executorInfo.freeCores += taskCpus
 
 Review comment:
   I put it there so that I don't have to worry about race conditions between when we get status updates from executors and when the scheduler removes the tasksets. for instance currently we could remove it from the taskidToTaskSetManager if the executor failed.  In the case the taskid to task set manager is missing we wouldn't know how many cores to properly add back. So to keep away from that race this seems cleaner and also possibly gives us a way to expose to the user in the future if we wanted.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601392081
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389516964
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 ##########
 @@ -457,18 +466,8 @@ private[spark] class TaskSetManager(
         // val timeTaken = clock.getTime() - startTime
         val taskName = s"task ${info.id} in stage ${taskSet.id}"
         logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
-          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
-
-        val extraResources = sched.resourcesReqsPerTask.map { taskReq =>
-          val rName = taskReq.resourceName
-          val count = taskReq.amount
-          val rAddresses = availableResources.getOrElse(rName, Seq.empty)
-          assert(rAddresses.size >= count, s"Required $count $rName addresses, but only " +
-            s"${rAddresses.size} available.")
-          // We'll drop the allocated addresses later inside TaskSchedulerImpl.
-          val allocatedAddresses = rAddresses.take(count)
-          (rName, new ResourceInformation(rName, allocatedAddresses.toArray))
-        }.toMap
+          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes) " +
+          s"cpus: $taskCpus, taskResourceAssignments ${taskResourceAssignments}")
 
 Review comment:
   nit: Do we need colon here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389501396
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -529,6 +549,28 @@ private[spark] class DAGScheduler(
     parents
   }
 
+  private[scheduler] def getResourceProfilesForRDDsInStage(
 
 Review comment:
   Is it possible to collect resource profiles at the time we collecting `ShuffleDependency`? So we might avoid another traverse here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389521856
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -392,11 +392,13 @@ private[spark] object ResourceUtils extends Logging {
       s"${resourceRequest.id.resourceName}")
   }
 
-  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+  def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int, taskCpus: Int): Boolean = {
     // Number of cores per executor must meet at least one task requirement.
-    if (execCores < taskCpus) {
-      throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
-        s"the number of cpus per task = $taskCpus.")
+    if (!sparkConf.get(TASKSET_MANAGER_SPECULATION_TESTING)) {
 
 Review comment:
   Why is this flag guarded ? Shouldn't we not always throw this exception if required task cores > executor cores ? This should be happening only for default profile ? (via CPUS_PER_TASK).
   Can non-default resource profiles also exhibit this ?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-593983223
 
 
   **[Test build #119233 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119233/testReport)** for PR 27773 at commit [`71dd91d`](https://github.com/apache/spark/commit/71dd91dd3119a0daab368af2e2443a3fca1a460d).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390059181
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
 ##########
 @@ -64,8 +64,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
     isSupported(rp)
     // force the computation of maxTasks and limitingResource now so we don't have cost later
     rp.limitingResource(sparkConf)
-    logInfo(s"Adding ResourceProfile id: ${rp.id}")
-    resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    if (res == null) {
+      logInfo(s"Added ResourceProfile id: ${rp.id}")
 
 Review comment:
   We may could record `RDD.id` to preserve only one `ResourceProfile` and allow override with warning.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597381573
 
 
   **[Test build #119632 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119632/testReport)** for PR 27773 at commit [`21bb8a8`](https://github.com/apache/spark/commit/21bb8a8a9856993e816b6d38c885dbcc77afa5fa).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597382274
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/119632/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232466
 
 
   **[Test build #119629 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119629/testReport)** for PR 27773 at commit [`926b0e9`](https://github.com/apache/spark/commit/926b0e998c02626508a25de6a805a2bff3d113b6).
    * This patch **fails Scala style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390500285
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
 
 Review comment:
   Ok so I think this is actually easier now that I added the ResourceProfileBuilder, and like you say I think it makes sense for efficiency.  But unless we want to keep 2 copies of the data, its going to change the public api. I think that is ok, but it means we need to get a bunch of functions like getExecutoreCores, getTaskCpus, etc.  And to do that efficiently it makes sense to pass these in differently to ResourceProfile. I think that is a large enough change it would make sense to do separately if you are ok with that?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-593980108
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597227673
 
 
   **[Test build #119628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119628/testReport)** for PR 27773 at commit [`24cbf01`](https://github.com/apache/spark/commit/24cbf016e70ab5e85330cf3a44695d654237b134).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gatorsmile commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
gatorsmile commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r410706110
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
 ##########
 @@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag](
   @Since("2.4.0")
   def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
 
+  /**
+   * Specify a ResourceProfile to use when calculating this RDD. This is only supported on
+   * certain cluster managers and currently requires dynamic allocation to be enabled.
+   * It will result in new executors with the resources specified being acquired to
+   * calculate the RDD.
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
+  @Experimental
+  @Since("3.0.0")
+  private[spark] def withResources(rp: ResourceProfile): this.type = {
+    resourceProfile = Option(rp)
+    sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
+    this
+  }
+
+  /**
+   * Get the ResourceProfile specified with this RDD or null if it wasn't specified.
+   * @return the user specified ResourceProfile or null (for Java compatibility) if
+   *         none was specified
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
+  @Experimental
+  @Since("3.0.0")
 
 Review comment:
   3.0.0 => 3.1.0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601377947
 
 
   **[Test build #120070 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120070/testReport)** for PR 27773 at commit [`4a0c1c6`](https://github.com/apache/spark/commit/4a0c1c6af1d54861e2c6ae2784cd438ba2f1b287).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378680
 
 
   **[Test build #120070 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120070/testReport)** for PR 27773 at commit [`4a0c1c6`](https://github.com/apache/spark/commit/4a0c1c6af1d54861e2c6ae2784cd438ba2f1b287).
    * This patch **fails Scala style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375253
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24786/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390064076
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -606,10 +608,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = rp.getTaskCpus.getOrElse(scheduler.CPUS_PER_TASK)
+    val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
 
 Review comment:
   Let's say we're in a dynamic cluster and there's only minimum living executors. And user has a barrier job like:
   
   ```
   rdd.withResources(rp).barrirer().mapPartition { part =>
     // do some barrier stuff
   }.collect()
   ```
   
   In dynamic mode, since executors are only going to launch after `SparkListenerStageSubmitted` posted. So, at the time we're `createShuffleMapStage`/`mergeResourceProfilesForStage`/`checkBarrierStageWithNumSlots`, there's no executors launched for it, right?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390060291
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -529,6 +549,28 @@ private[spark] class DAGScheduler(
     parents
   }
 
+  private[scheduler] def getResourceProfilesForRDDsInStage(
 
 Review comment:
   At least, I think we should avoid collecting resource profiles when there's no resource profiles attach with the RDD by asking the `ResourceProfileManager`(if it has recorded RDD.id with resources profiles). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389520065
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -140,12 +141,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def receive: PartialFunction[Any, Unit] = {
-      case StatusUpdate(executorId, taskId, state, data, resources) =>
+      case StatusUpdate(executorId, taskId, state, data, taskCpus, resources) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
           executorDataMap.get(executorId) match {
             case Some(executorInfo) =>
-              executorInfo.freeCores += scheduler.CPUS_PER_TASK
+              executorInfo.freeCores += taskCpus
 
 Review comment:
   We can get `taskCpus` through this executor's resource profile? e.g. `sc.resourceProfileManager.taskCpusForProfileId(taskSetRpID)`
   
   Thus, we don't need to pass `taskCpus` to executor.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-603869129
 
 
   Hi @tgravescs I'll try to take another look tomorrow.
   
   Still ping @jiangxb1987 @squito if they would be available to review this too.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597328534
 
 
   **[Test build #119632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119632/testReport)** for PR 27773 at commit [`21bb8a8`](https://github.com/apache/spark/commit/21bb8a8a9856993e816b6d38c885dbcc77afa5fa).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-593983223
 
 
   **[Test build #119233 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119233/testReport)** for PR 27773 at commit [`71dd91d`](https://github.com/apache/spark/commit/71dd91dd3119a0daab368af2e2443a3fca1a460d).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601392081
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601444848
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601377947
 
 
   **[Test build #120070 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120070/testReport)** for PR 27773 at commit [`4a0c1c6`](https://github.com/apache/spark/commit/4a0c1c6af1d54861e2c6ae2784cd438ba2f1b287).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r390394335
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(sc.getConf)
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
 
 Review comment:
   ok, I moved this into the ResourceProfile object as a utility function and renamed to be getTaskCpusOrDefaultForProfile

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601378704
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120070/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-601375388
 
 
   **[Test build #120069 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120069/testReport)** for PR 27773 at commit [`545f5b2`](https://github.com/apache/spark/commit/545f5b205aa6ff3e822524e9b44b082b2bc7b762).
    * This patch **fails Scala style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gatorsmile commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
gatorsmile commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r410706087
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
 ##########
 @@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag](
   @Since("2.4.0")
   def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
 
+  /**
+   * Specify a ResourceProfile to use when calculating this RDD. This is only supported on
+   * certain cluster managers and currently requires dynamic allocation to be enabled.
+   * It will result in new executors with the resources specified being acquired to
+   * calculate the RDD.
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
+  @Experimental
+  @Since("3.0.0")
 
 Review comment:
   3.0.0 => 3.1.0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
mridulm commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-602019491
 
 
   The changes look good to me, will wait for @Ngone51 also to take a pass.
   Thanks for working on this Tom !

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r395115853
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
 ##########
 @@ -3094,6 +3100,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("test default resource profile") {
+    val rdd = sc.parallelize(1 to 10).map(x => (x, x))
+    val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val rp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    assert(rp.id == scheduler.sc.resourceProfileManager.defaultResourceProfile.id)
+  }
+
+  test("test 1 resource profile") {
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val rdd = sc.parallelize(1 to 10).map(x => (x, x)).withResources(rp1)
+    val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val rpMerged = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    val expectedid = Option(rdd.getResourceProfile).map(_.id)
+    assert(expectedid.isDefined)
+    assert(expectedid.get != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    assert(rpMerged.id == expectedid.get)
+  }
+
+  test("test 2 resource profiles errors by default") {
+    import org.apache.spark.resource._
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(2)
+    val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+    val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
+    val error = intercept[IllegalArgumentException] {
+      val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+      scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    }.getMessage()
+
+    assert(error.contains("Multiple ResourceProfile's specified in the RDDs"))
+  }
+
 
 Review comment:
   sure, I'll add one

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597227673
 
 
   **[Test build #119628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119628/testReport)** for PR 27773 at commit [`24cbf01`](https://github.com/apache/spark/commit/24cbf016e70ab5e85330cf3a44695d654237b134).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389909401
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(sc.getConf)
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+
+    offersForResourceProfile.map { case (o, index) =>
+      val numTasksPerExecCores = availableCpus(index) / taskCpus
+      // when executor cores config isn't set, we can't calculate the real limiting resource
+      // and number of tasks per executor ahead of time, so calculate it now.
+      if (!coresKnown) {
+        val numTasksPerExecCustomResource = resourceProfile.maxTasksPerExecutor(sc.getConf)
+        if (limitingResource.isEmpty ||
+          (limitingResource.nonEmpty && numTasksPerExecCores < numTasksPerExecCustomResource)) {
+          limitingResource = ResourceProfile.CPUS
 
 Review comment:
   ah yes, thanks for catching this.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389496401
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##########
 @@ -1617,13 +1617,17 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the ResourceProfile
+   * being used.
    * Note that please don't cache the value returned by this method, because the number can change
    * due to add/remove executors.
    *
+   * @param rp ResourceProfile which to use to calculate max concurrent tasks.
    * @return The max number of tasks that can be concurrent launched currently.
    */
-  private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks()
+  private[spark] def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
 
 Review comment:
   Give a default value(e.g. default ResourceProfile) for `rp`? So caller/developer who isn't aware of `ResourceProfile` doesn't need to care about it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#issuecomment-597232315
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/24359/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393420810
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 ##########
 @@ -433,14 +435,32 @@ private[spark] class DAGScheduler(
    * the check fails consecutively beyond a configured number for a job, then fail current job
    * submission.
    */
-  private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
+  private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = {
     val numPartitions = rdd.getNumPartitions
-    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
+    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
     if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
       throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
     }
   }
 
+  private[scheduler] def mergeResourceProfilesForStage(
+      stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = {
+    logDebug("rdd profiles: " + stageResourceProfiles)
 
 Review comment:
   It should be a stage's profiles?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r393417188
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
 ##########
 @@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag](
   @Since("2.4.0")
   def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
 
+  /**
+   * Specify a ResourceProfile to use when calculating this RDD. This is only supported on
+   * certain cluster managers and currently requires dynamic allocation to be enabled.
+   * It will result in new executors with the resources specified being acquired to
+   * calculate the RDD.
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
+  @Experimental
+  @Since("3.0.0")
+  private[spark] def withResources(rp: ResourceProfile): this.type = {
+    resourceProfile = Some(rp)
 
 Review comment:
   Maybe, `Option(rp)`? In case of a `null` input.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org