You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/03 00:17:20 UTC

[1/2] git commit: Remove erroneous FAILED state for killed tasks.

Updated Branches:
  refs/heads/master 588a1695f -> 0475ca8f8


Remove erroneous FAILED state for killed tasks.

Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a1b438d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a1b438d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a1b438d9

Branch: refs/heads/master
Commit: a1b438d94de10506dc7dcac54eb331ee2c0479aa
Parents: 3713f81
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Thu Jan 2 12:34:46 2014 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Jan 2 12:34:46 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 24 +++++++++++---------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  3 ++-
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1b438d9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3c92c20..e51d274 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -141,11 +141,6 @@ private[spark] class Executor(
     val tr = runningTasks.get(taskId)
     if (tr != null) {
       tr.kill()
-      // We remove the task also in the finally block in TaskRunner.run.
-      // The reason we need to remove it here is because killTask might be called before the task
-      // is even launched, and never reaching that finally block. ConcurrentHashMap's remove is
-      // idempotent.
-      runningTasks.remove(taskId)
     }
   }
 
@@ -167,6 +162,8 @@ private[spark] class Executor(
   class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
     extends Runnable {
 
+    object TaskKilledException extends Exception
+
     @volatile private var killed = false
     @volatile private var task: Task[Any] = _
 
@@ -200,9 +197,11 @@ private[spark] class Executor(
         // If this task has been killed before we deserialized it, let's quit now. Otherwise,
         // continue executing the task.
         if (killed) {
-          logInfo("Executor killed task " + taskId)
-          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
-          return
+          // Throw an exception rather than returning, because returning within a try{} block
+          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
+          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
+          // for the task.
+          throw TaskKilledException
         }
 
         attemptedTask = Some(task)
@@ -216,9 +215,7 @@ private[spark] class Executor(
 
         // If the task has been killed, let's fail it.
         if (task.killed) {
-          logInfo("Executor killed task " + taskId)
-          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
-          return
+          throw TaskKilledException
         }
 
         val resultSer = SparkEnv.get.serializer.newInstance()
@@ -260,6 +257,11 @@ private[spark] class Executor(
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
         }
 
+        case TaskKilledException => {
+          logInfo("Executor killed task " + taskId)
+          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
+        }
+
         case t: Throwable => {
           val serviceTime = (System.currentTimeMillis() - taskStart).toInt
           val metrics = attemptedTask.flatMap(t => t.metrics)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1b438d9/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d94b706..1b67332 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -285,7 +285,8 @@ private[spark] class TaskSchedulerImpl(
               }
             }
           case None =>
-            logInfo("Ignoring update from TID " + tid + " because its task set is gone")
+            logInfo("Ignoring update with state %s from TID %s because its task set is gone"
+              .format(state, tid))
         }
       } catch {
         case e: Exception => logError("Exception in statusUpdate", e)


[2/2] git commit: Merge pull request #320 from kayousterhout/erroneous_failed_msg

Posted by rx...@apache.org.
Merge pull request #320 from kayousterhout/erroneous_failed_msg

Remove erroneous FAILED state for killed tasks.

Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.

I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0475ca8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0475ca8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0475ca8f

Branch: refs/heads/master
Commit: 0475ca8f81b6b8f21fdb841922cd9ab51cfc8cc3
Parents: 588a169 a1b438d
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jan 2 15:17:08 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jan 2 15:17:08 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 24 +++++++++++---------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  3 ++-
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0475ca8f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------