You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/16 08:47:42 UTC

[1/3] git commit: Fail rather than hanging if a task crashes the JVM.

Updated Branches:
  refs/heads/master 84595ea3e -> c06a307ca


Fail rather than hanging if a task crashes the JVM.

Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED.  As a result, the TaskSetManager will retry the task
indefiniteily rather than failing the job after maxFailures. This
commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.

The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a268d634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a268d634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a268d634

Branch: refs/heads/master
Commit: a268d634113536f7aca11af23619b9713b5ef5de
Parents: 5fecd25
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Jan 15 16:03:40 2014 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Jan 15 16:03:40 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskSetManager.scala  |  2 +-
 .../scala/org/apache/spark/DistributedSuite.scala    | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a268d634/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fc0ee07..5ad00a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
     }
     // Also re-enqueue any tasks that were running on the node
     for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
-      handleFailedTask(tid, TaskState.KILLED, None)
+      handleFailedTask(tid, TaskState.FAILED, None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a268d634/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cb7fe..27c4b01 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -125,6 +125,21 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
     assert(thrown.getMessage.contains("failed 4 times"))
   }
 
+  test("repeatedly failing task that crashes JVM") {
+    // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
+    // than hanging.
+    sc = new SparkContext(clusterUrl, "test")
+    failAfter(Span(100000, Millis)) {
+      val thrown = intercept[SparkException] {
+        // One of the tasks always fails.
+        sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
+      }
+      assert(thrown.getClass === classOf[SparkException])
+      System.out.println(thrown.getMessage)
+      assert(thrown.getMessage.contains("failed 4 times"))
+    }
+  }
+
   test("caching") {
     sc = new SparkContext(clusterUrl, "test")
     val data = sc.parallelize(1 to 1000, 10).cache()


[3/3] git commit: Merge pull request #445 from kayousterhout/exec_lost

Posted by rx...@apache.org.
Merge pull request #445 from kayousterhout/exec_lost

Fail rather than hanging if a task crashes the JVM.

Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED.  As a result, the TaskSetManager will retry the task
indefinitely rather than failing the job after maxFailures. Eventually,
this makes the job hang, because the Standalone Scheduler removes
the application after 10 works have failed, and then the app is left
in a state where it's disconnected from the master and waiting to reconnect.
This commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.

The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c06a307c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c06a307c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c06a307c

Branch: refs/heads/master
Commit: c06a307ca22901839df00d25fe623f6faa6af17e
Parents: 84595ea 718a13c
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Jan 15 23:47:25 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Jan 15 23:47:25 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../scala/org/apache/spark/DistributedSuite.scala  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/3] git commit: Updated unit test comment

Posted by rx...@apache.org.
Updated unit test comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/718a13c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/718a13c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/718a13c1

Branch: refs/heads/master
Commit: 718a13c179915767107bc20cd27d9480d069231c
Parents: a268d63
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Jan 15 23:46:14 2014 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Jan 15 23:46:14 2014 -0800

----------------------------------------------------------------------
 core/src/test/scala/org/apache/spark/DistributedSuite.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/718a13c1/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 27c4b01..8de7a32 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -127,7 +127,9 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
 
   test("repeatedly failing task that crashes JVM") {
     // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
-    // than hanging.
+    // than hanging due to retrying the failed task infinitely many times (eventually the
+    // standalone scheduler will remove the application, causing the job to hang waiting to
+    // reconnect to the master).
     sc = new SparkContext(clusterUrl, "test")
     failAfter(Span(100000, Millis)) {
       val thrown = intercept[SparkException] {