You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/21 04:16:05 UTC

[1/3] git commit: Handle IndirectTaskResults in LocalScheduler

Updated Branches:
  refs/heads/branch-0.8 df5fadaa3 -> 88c565d19


Handle IndirectTaskResults in LocalScheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6183102d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6183102d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6183102d

Branch: refs/heads/branch-0.8
Commit: 6183102d6d28391f70c603a3d2b51d609dcd5586
Parents: df5fada
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 20 00:43:20 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 20 00:43:20 2013 -0800

----------------------------------------------------------------------
 .../scheduler/local/LocalTaskSetManager.scala   | 39 +++++++++++++++-----
 1 file changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6183102d/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index 53bf782..a498599 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -21,7 +21,8 @@ import java.nio.ByteBuffer
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
-import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
+import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success,
+  TaskEndReason, TaskResultLost, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task,
   TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager}
@@ -144,7 +145,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match {
       case directResult: DirectTaskResult[_] => directResult
       case IndirectTaskResult(blockId) => {
-        throw new SparkException("Expect only DirectTaskResults when using LocalScheduler")
+        logDebug("Fetching indirect task result for TID %s".format(tid))
+        val serializedTaskResult = env.blockManager.getRemoteBytes(blockId)
+        if (!serializedTaskResult.isDefined) {
+          /* We won't be able to get the task result if the block manager had to flush the
+           * result. */
+          taskFailed(tid, state, serializedData)
+          return
+        }
+        val deserializedResult = ser.deserialize[DirectTaskResult[_]](
+          serializedTaskResult.get)
+        env.blockManager.master.removeBlock(blockId)
+        deserializedResult
       }
     }
     result.metrics.resultSize = serializedData.limit()
@@ -164,18 +176,25 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     val task = taskSet.tasks(index)
     info.markFailed()
     decreaseRunningTasks(1)
-    val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
-      serializedData, getClass.getClassLoader)
-    sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
+    ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) match {
+      case ef: ExceptionFailure =>
+        val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+        logInfo("Task loss due to %s\n%s\n%s".format(
+          ef.className, ef.description, locs.mkString("\n")))
+        sched.dagScheduler.taskEnded(task, ef, null, null, info, ef.metrics.getOrElse(null))
+
+      case TaskResultLost =>
+        logWarning("Lost result for TID %s".format(tid))
+        sched.dagScheduler.taskEnded(task, TaskResultLost, null, null, info, null)
+
+      case _ => {}
+    }
     if (!finished(index)) {
       copiesRunning(index) -= 1
       numFailures(index) += 1
-      val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
-      logInfo("Loss was due to %s\n%s\n%s".format(
-        reason.className, reason.description, locs.mkString("\n")))
       if (numFailures(index) > MAX_TASK_FAILURES) {
-        val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
-          taskSet.id, index, MAX_TASK_FAILURES, reason.description)
+        val errorMessage = "Task %s:%d failed more than %d times; aborting job".format(
+          taskSet.id, index, MAX_TASK_FAILURES)
         decreaseRunningTasks(runningTasks)
         sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
         // need to delete failed Taskset from schedule queue


[3/3] git commit: Merge pull request #281 from kayousterhout/local_indirect_fix

Posted by ma...@apache.org.
Merge pull request #281 from kayousterhout/local_indirect_fix

Handle IndirectTaskResults in LocalScheduler

This fixes a bug where large results aren't correctly handled when running in local mode.  Not doing this in master because expecting the Local/Cluster scheduler consolidation to go into 0.9, which will fix this issue (see #127)


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/88c565d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/88c565d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/88c565d1

Branch: refs/heads/branch-0.8
Commit: 88c565d19caba8619a20ea613ea920f681d10cdf
Parents: df5fada d7bf08c
Author: Matei Zaharia <ma...@databricks.com>
Authored: Fri Dec 20 19:15:54 2013 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Dec 20 19:15:54 2013 -0800

----------------------------------------------------------------------
 .../scheduler/local/LocalTaskSetManager.scala   | 42 +++++++++++++++-----
 1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: Fixed test failure by adding exception to abortion msg

Posted by ma...@apache.org.
Fixed test failure by adding exception to abortion msg


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d7bf08cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d7bf08cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d7bf08cb

Branch: refs/heads/branch-0.8
Commit: d7bf08cba3dc34180ef6b744560f99c8aefc96bf
Parents: 6183102
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 20 10:19:03 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 20 10:19:03 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/local/LocalTaskSetManager.scala  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d7bf08cb/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index a498599..f92ad4a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -176,15 +176,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     val task = taskSet.tasks(index)
     info.markFailed()
     decreaseRunningTasks(1)
+    var failureReason = "unknown"
     ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) match {
       case ef: ExceptionFailure =>
+        failureReason = "Exception failure: %s".format(ef.description)
         val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
         logInfo("Task loss due to %s\n%s\n%s".format(
           ef.className, ef.description, locs.mkString("\n")))
         sched.dagScheduler.taskEnded(task, ef, null, null, info, ef.metrics.getOrElse(null))
 
       case TaskResultLost =>
-        logWarning("Lost result for TID %s".format(tid))
+        failureReason = "Lost result for TID %s".format(tid)
+        logWarning(failureReason)
         sched.dagScheduler.taskEnded(task, TaskResultLost, null, null, info, null)
 
       case _ => {}
@@ -193,8 +196,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
       copiesRunning(index) -= 1
       numFailures(index) += 1
       if (numFailures(index) > MAX_TASK_FAILURES) {
-        val errorMessage = "Task %s:%d failed more than %d times; aborting job".format(
-          taskSet.id, index, MAX_TASK_FAILURES)
+        val errorMessage = ("Task %s:%d failed more than %d times; aborting job" +
+          "(most recent failure: %s").format(taskSet.id, index, MAX_TASK_FAILURES, failureReason)
         decreaseRunningTasks(runningTasks)
         sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
         // need to delete failed Taskset from schedule queue