You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/06 20:31:14 UTC

[GitHub] [spark] squito commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] Make locality wait time be the time since a TSM's available slots were fully utilized

squito commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] Make locality wait time be the time since a TSM's available slots were fully utilized
URL: https://github.com/apache/spark/pull/26696#discussion_r355016546
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/Pool.scala
 ##########
 @@ -119,4 +119,28 @@ private[spark] class Pool(
       parent.decreaseRunningTasks(taskNum)
     }
   }
+
+  override def updateAvailableSlots(numSlots: Float): Unit = {
+    schedulingMode match {
+      case SchedulingMode.FAIR =>
+        val usableWeights = schedulableQueue.asScala
+          .map(s => if (s.getSortedTaskSetQueue.nonEmpty) (s, s.weight) else (s, 0))
+        val totalWeights = usableWeights.map(_._2).sum
+        usableWeights.foreach({case (schedulable, usableWeight) =>
+          schedulable.updateAvailableSlots(
+            Math.max(numSlots * usableWeight / totalWeights, schedulable.minShare))
+        })
+      case SchedulingMode.FIFO =>
+        val sortedSchedulableQueue =
+          schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
+        var isFirst = true
+        for (schedulable <- sortedSchedulableQueue) {
+          schedulable.updateAvailableSlots(if (isFirst) numSlots else 0)
 
 Review comment:
   `sortedSchedulableQueue` can have many tasksets even with FIFO.  Its just all active tasks in the order they were submitted -- which may be multiple tasksets from a single job with a branch, or from concurrent jobs.
   
   I do think a comment is necessary here to explain this.  If I understood right, the idea here is that all other tasksets follow the old logic on resetting locality level -- for every single task that is scheduled, the locality timer gets reset.  But for the first taskset, it only resets when the taskset is using all slots

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org