You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/04 06:51:28 UTC

[GitHub] [spark] mridulm commented on a change in pull request #28287: [SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature.

mridulm commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r447233292



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -696,7 +713,9 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) {
+        if (stageAttemptToNumTasks.isEmpty &&
+          stageAttemptToNumSpeculativeTasks.isEmpty &&
+          unschedulableTaskSets.isEmpty) {

Review comment:
       Is this required ? If `unschedulableTaskSets` is not empty, then `stageAttemptToNumTasks` should also be non empty right ?

##########
File path: core/src/main/java/org/apache/spark/SparkFirehoseListener.java
##########
@@ -162,6 +162,11 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe
     onEvent(speculativeTask);
   }
 
+  public void onUnschedulableBlacklistTaskSubmitted(

Review comment:
       Couple of things here:
   * The task is not submitted, but rather has become unschedulable (perhaps due to lost executors).
   * The event details are about unschedulable task set - does not give task information.
   
   Can we rename this to something better ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,23 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableBlacklistTaskSubmitted
+      (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit = {
+      val stageId = blacklistedTask.stageId
+      val stageAttemptId = blacklistedTask.stageAttemptId
+      allocationManager.synchronized {
+        (stageId, stageAttemptId) match {
+          case (Some(stageId), Some(stageAttemptId)) =>
+            val stageAttempt = StageAttempt(stageId, stageAttemptId)
+            unschedulableTaskSets.add(stageAttempt)
+          case (None, _) =>
+            // Clear unschedulableTaskSets since atleast one task becomes schedulable now
+            unschedulableTaskSets.clear()

Review comment:
       Why are we clearing all tasksets ? Shouldn't this not remove the specific taskset which became schedulable ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,23 @@ private[spark] class ExecutorAllocationManager(
       s" tasksperexecutor: $tasksPerExecutor")
     val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
       tasksPerExecutor).toInt
-    if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+    val totalNeed = if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
       // If we have pending speculative tasks and only need a single executor, allocate one more
       // to satisfy the locality requirements of speculation
       maxNeeded + 1
     } else {
       maxNeeded
     }
+
+    // Request additional executors to schedule the unschedulable tasks as well
+    if (numUnschedulables > 0) {
+      val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio /
+        tasksPerExecutor).toInt
+      math.max(totalNeed, executorMonitor.executorCountWithResourceProfile(rpId)) +
+        maxNeededForUnschedulables
+    } else {

Review comment:
       Can you update with details about how this was resolved ? The original looked fine to me, but I want to make sure I am not missing something.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +868,25 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = {

Review comment:
       The method is returning number of task sets which are not schedulable - not number of tasks which are not scheduable.
   `maxNumExecutorsNeededPerResourceProfile` is relying on this being task count - we will need to fix it here and in the event being fired.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org