You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/04 07:42:05 UTC

git commit: Merge pull request #445 from kayousterhout/exec_lost

Updated Branches:
  refs/heads/branch-0.9 dc8adf158 -> 0021ef964


Merge pull request #445 from kayousterhout/exec_lost

Fail rather than hanging if a task crashes the JVM.

Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED.  As a result, the TaskSetManager will retry the task
indefinitely rather than failing the job after maxFailures. Eventually,
this makes the job hang, because the Standalone Scheduler removes
the application after 10 works have failed, and then the app is left
in a state where it's disconnected from the master and waiting to reconnect.
This commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.

The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
(cherry picked from commit c06a307ca22901839df00d25fe623f6faa6af17e)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0021ef96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0021ef96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0021ef96

Branch: refs/heads/branch-0.9
Commit: 0021ef9645cdd84e9f375afdc262cf8919fa2ab9
Parents: dc8adf1
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Jan 15 23:47:25 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Feb 3 22:42:01 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../scala/org/apache/spark/DistributedSuite.scala  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0021ef96/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index ccd7f6f..e914708 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
     }
     // Also re-enqueue any tasks that were running on the node
     for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
-      handleFailedTask(tid, TaskState.KILLED, None)
+      handleFailedTask(tid, TaskState.FAILED, None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0021ef96/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cb7fe..8de7a32 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -125,6 +125,23 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
     assert(thrown.getMessage.contains("failed 4 times"))
   }
 
+  test("repeatedly failing task that crashes JVM") {
+    // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
+    // than hanging due to retrying the failed task infinitely many times (eventually the
+    // standalone scheduler will remove the application, causing the job to hang waiting to
+    // reconnect to the master).
+    sc = new SparkContext(clusterUrl, "test")
+    failAfter(Span(100000, Millis)) {
+      val thrown = intercept[SparkException] {
+        // One of the tasks always fails.
+        sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
+      }
+      assert(thrown.getClass === classOf[SparkException])
+      System.out.println(thrown.getMessage)
+      assert(thrown.getMessage.contains("failed 4 times"))
+    }
+  }
+
   test("caching") {
     sc = new SparkContext(clusterUrl, "test")
     val data = sc.parallelize(1 to 1000, 10).cache()