You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/10/22 20:39:22 UTC

spark git commit: [SPARK-11163] Remove unnecessary addPendingTask calls.

Repository: spark
Updated Branches:
  refs/heads/master 7bb6d31cf -> 3535b91dd


[SPARK-11163] Remove unnecessary addPendingTask calls.

This commit removes unnecessary calls to addPendingTask in
TaskSetManager.executorLost. These calls are unnecessary: for
tasks that are still pending and haven't been launched, they're
still in all of the correct pending lists, so calling addPendingTask
has no effect. For tasks that are currently running (which may still be
in the pending lists, depending on how they were scheduled), we call
addPendingTask in handleFailedTask, so the calls at the beginning
of executorLost are redundant.

I think these calls are left over from when we re-computed the locality
levels in addPendingTask; now that we call recomputeLocality separately,
I don't think these are necessary.

Now that those calls are removed, the readding parameter in addPendingTask
is no longer necessary, so this commit also removes that parameter.

markhamstra can you take a look at this?

cc vanzin

Author: Kay Ousterhout <ka...@gmail.com>

Closes #9154 from kayousterhout/SPARK-11163.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3535b91d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3535b91d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3535b91d

Branch: refs/heads/master
Commit: 3535b91ddc9fd05b613a121e09263b0f378bd5fa
Parents: 7bb6d31
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Thu Oct 22 11:39:06 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Oct 22 11:39:06 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala | 27 ++++----------------
 1 file changed, 5 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3535b91d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c02597c..987800d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -177,14 +177,11 @@ private[spark] class TaskSetManager(
 
   var emittedTaskSizeWarning = false
 
-  /**
-   * Add a task to all the pending-task lists that it should be on. If readding is set, we are
-   * re-adding the task so only include it in each list if it's not already there.
-   */
-  private def addPendingTask(index: Int, readding: Boolean = false) {
-    // Utility method that adds `index` to a list only if readding=false or it's not already there
+  /** Add a task to all the pending-task lists that it should be on. */
+  private def addPendingTask(index: Int) {
+    // Utility method that adds `index` to a list only if it's not already there
     def addTo(list: ArrayBuffer[Int]) {
-      if (!readding || !list.contains(index)) {
+      if (!list.contains(index)) {
         list += index
       }
     }
@@ -219,9 +216,7 @@ private[spark] class TaskSetManager(
       addTo(pendingTasksWithNoPrefs)
     }
 
-    if (!readding) {
-      allPendingTasks += index  // No point scanning this whole list to find the old task there
-    }
+    allPendingTasks += index  // No point scanning this whole list to find the old task there
   }
 
   /**
@@ -783,18 +778,6 @@ private[spark] class TaskSetManager(
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
   override def executorLost(execId: String, host: String, reason: ExecutorLossReason) {
-    logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
-
-    // Re-enqueue pending tasks for this host based on the status of the cluster. Note
-    // that it's okay if we add a task to the same queue twice (if it had multiple preferred
-    // locations), because dequeueTaskFromList will skip already-running tasks.
-    for (index <- getPendingTasksForExecutor(execId)) {
-      addPendingTask(index, readding = true)
-    }
-    for (index <- getPendingTasksForHost(host)) {
-      addPendingTask(index, readding = true)
-    }
-
     // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
     // and we are not using an external shuffle server which could serve the shuffle outputs.
     // The reason is the next stage wouldn't be able to fetch the data from this dead executor


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org