You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/10 18:09:21 UTC

[GitHub] [spark] mridulm commented on a change in pull request #35172: [SPARK-36664][CORE] Log time waiting for cluster resources

mridulm commented on a change in pull request #35172:
URL: https://github.com/apache/spark/pull/35172#discussion_r803948435



##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -780,15 +805,52 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       (scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
     }
     val response = synchronized {
+      val oldResourceProfileToNumExecutors = requestedTotalExecutorsPerResourceProfile.map {
+        case (rp, num) =>
+          (rp.id, num)
+      }.toMap
       this.requestedTotalExecutorsPerResourceProfile.clear()
       this.requestedTotalExecutorsPerResourceProfile ++= resourceProfileToNumExecutors
       this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
       this.rpHostToLocalTaskCount = hostToLocalTaskCount
+      updateExecRequestTimes(oldResourceProfileToNumExecutors, resourceProfileIdToNumExecutors)
       doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
     }
     defaultAskTimeout.awaitResult(response)
   }
 
+  private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile: Map[Int, Int]): Unit = {
+    newProfile.map {
+      case (k, v) =>
+        val delta = v - oldProfile.getOrElse(k, 0)
+        if (delta != 0) {
+          updateExecRequestTime(k, delta)
+        }
+    }

Review comment:
       Looks like if we drop keeps from newProfile, `requestedTotalExecutorsPerResourceProfile` silently forgets about them - and backend continues to request for the earlier count ?
   I should relook at this later (to clarify, this is not an issue with this PR btw)

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -260,9 +266,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
               .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
             (info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))
           }
+          // If we've requested the executor figure out when we did.
+          val reqTs: Option[Long] = CoarseGrainedSchedulerBackend.this.synchronized {
+            execRequestTimes.get(resourceProfileId).flatMap {
+              times =>
+              times.headOption.map {
+                h =>
+                // Take off the top element
+                times.dequeue()
+                // If we requested more than one exec reduce the req count by 1 and prepend it back
+                if (h._1 > 1) {
+                  ((h._1 - 1, h._2)) +=: times

Review comment:
       I will let @holdenk elaborate better.
   If we are considering an execution allocation to be to satisfy the oldest pending request, this should be fine - right ?
   
   Requests:
   queue += (t=1, num=2)
   queue += (t=2, num=1)
   
   registration of exec1 at t=3:
   queue.dequeue -> update num to 1 and prepend back to queue, so that (t=1, num=1) remains head.
   (exec1 will have requestTs = 1, and a subsequent exec2 will also have requestTs = 1)
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org