You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/09 09:48:20 UTC

[GitHub] [spark] mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling

mridulm commented on a change in pull request #27773: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389536968
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise returns
+   * the task resource assignments to give to the next task. Note that the assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = availWorkerResources.get(rName).map(_.size).getOrElse(0)
 
 Review comment:
   `availWorkerResources.get(rName)` -> `workerRes` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org