You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/15 07:29:34 UTC

[1/2] git commit: Fix bug where scheduler could hang after task failure.

Updated Branches:
  refs/heads/master dfd40e9f6 -> 96e0fb463


Fix bug where scheduler could hang after task failure.

When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b4546ba9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b4546ba9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b4546ba9

Branch: refs/heads/master
Commit: b4546ba9e694529c359b7ca5c26829ead2c07f1a
Parents: 1a4cfbe
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Thu Nov 14 13:33:11 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Nov 14 13:55:03 2013 -0800

----------------------------------------------------------------------
 .../spark/scheduler/cluster/ClusterScheduler.scala     | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4546ba9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 53a5896..c1e65a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var failedExecutor: Option[String] = None
-    var taskFailed = false
     synchronized {
       try {
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
               }
               taskIdToExecutorId.remove(tid)
             }
-            if (state == TaskState.FAILED) {
-              taskFailed = true
-            }
             activeTaskSets.get(taskSetId).foreach { taskSet =>
               if (state == TaskState.FINISHED) {
                 taskSet.removeRunningTask(tid)
@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       dagScheduler.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
-    if (taskFailed) {
-      // Also revive offers if a task had failed for some reason other than host lost
-      backend.reviveOffers()
-    }
   }
 
   def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     taskState: TaskState,
     reason: Option[TaskEndReason]) = synchronized {
     taskSetManager.handleFailedTask(tid, taskState, reason)
-    if (taskState == TaskState.FINISHED) {
-      // The task finished successfully but the result was lost, so we should revive offers.
+    if (taskState != TaskState.KILLED) {
+      // Need to revive offers again now that the task set manager state has been updated to
+      // reflect failed tasks that need to be re-run.
       backend.reviveOffers()
     }
   }


[2/2] git commit: Merge pull request #173 from kayousterhout/scheduler_hang

Posted by ma...@apache.org.
Merge pull request #173 from kayousterhout/scheduler_hang

Fix bug where scheduler could hang after task failure.

When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.

This isn't currently unit tested but will be once my pull request for
merging the cluster and local schedulers goes in -- at which point
many more of the unit tests will exercise the code paths through
the cluster scheduler (currently the failure test suite uses the local
scheduler, which is why we didn't see this bug before).


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/96e0fb46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/96e0fb46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/96e0fb46

Branch: refs/heads/master
Commit: 96e0fb46309698b685c811a65bd8e1a691389994
Parents: dfd40e9 b4546ba
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Thu Nov 14 22:29:28 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Thu Nov 14 22:29:28 2013 -0800

----------------------------------------------------------------------
 .../spark/scheduler/cluster/ClusterScheduler.scala     | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------