You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/21 04:16:05 UTC
[1/3] git commit: Handle IndirectTaskResults in LocalScheduler
Updated Branches:
refs/heads/branch-0.8 df5fadaa3 -> 88c565d19
Handle IndirectTaskResults in LocalScheduler
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6183102d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6183102d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6183102d
Branch: refs/heads/branch-0.8
Commit: 6183102d6d28391f70c603a3d2b51d609dcd5586
Parents: df5fada
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 20 00:43:20 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 20 00:43:20 2013 -0800
----------------------------------------------------------------------
.../scheduler/local/LocalTaskSetManager.scala | 39 +++++++++++++++-----
1 file changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6183102d/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index 53bf782..a498599 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -21,7 +21,8 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
+import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success,
+ TaskEndReason, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task,
TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager}
@@ -144,7 +145,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match {
case directResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) => {
- throw new SparkException("Expect only DirectTaskResults when using LocalScheduler")
+ logDebug("Fetching indirect task result for TID %s".format(tid))
+ val serializedTaskResult = env.blockManager.getRemoteBytes(blockId)
+ if (!serializedTaskResult.isDefined) {
+ /* We won't be able to get the task result if the block manager had to flush the
+ * result. */
+ taskFailed(tid, state, serializedData)
+ return
+ }
+ val deserializedResult = ser.deserialize[DirectTaskResult[_]](
+ serializedTaskResult.get)
+ env.blockManager.master.removeBlock(blockId)
+ deserializedResult
}
}
result.metrics.resultSize = serializedData.limit()
@@ -164,18 +176,25 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val task = taskSet.tasks(index)
info.markFailed()
decreaseRunningTasks(1)
- val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
- serializedData, getClass.getClassLoader)
- sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
+ ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) match {
+ case ef: ExceptionFailure =>
+ val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+ logInfo("Task loss due to %s\n%s\n%s".format(
+ ef.className, ef.description, locs.mkString("\n")))
+ sched.dagScheduler.taskEnded(task, ef, null, null, info, ef.metrics.getOrElse(null))
+
+ case TaskResultLost =>
+ logWarning("Lost result for TID %s".format(tid))
+ sched.dagScheduler.taskEnded(task, TaskResultLost, null, null, info, null)
+
+ case _ => {}
+ }
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
- val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s\n%s".format(
- reason.className, reason.description, locs.mkString("\n")))
if (numFailures(index) > MAX_TASK_FAILURES) {
- val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
- taskSet.id, index, MAX_TASK_FAILURES, reason.description)
+ val errorMessage = "Task %s:%d failed more than %d times; aborting job".format(
+ taskSet.id, index, MAX_TASK_FAILURES)
decreaseRunningTasks(runningTasks)
sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue
[3/3] git commit: Merge pull request #281 from
kayousterhout/local_indirect_fix
Posted by ma...@apache.org.
Merge pull request #281 from kayousterhout/local_indirect_fix
Handle IndirectTaskResults in LocalScheduler
This fixes a bug where large results aren't correctly handled when running in local mode. Not doing this in master because expecting the Local/Cluster scheduler consolidation to go into 0.9, which will fix this issue (see #127)
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/88c565d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/88c565d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/88c565d1
Branch: refs/heads/branch-0.8
Commit: 88c565d19caba8619a20ea613ea920f681d10cdf
Parents: df5fada d7bf08c
Author: Matei Zaharia <ma...@databricks.com>
Authored: Fri Dec 20 19:15:54 2013 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Dec 20 19:15:54 2013 -0800
----------------------------------------------------------------------
.../scheduler/local/LocalTaskSetManager.scala | 42 +++++++++++++++-----
1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: Fixed test failure by adding exception to abortion
msg
Posted by ma...@apache.org.
Fixed test failure by adding exception to abortion msg
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d7bf08cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d7bf08cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d7bf08cb
Branch: refs/heads/branch-0.8
Commit: d7bf08cba3dc34180ef6b744560f99c8aefc96bf
Parents: 6183102
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 20 10:19:03 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 20 10:19:03 2013 -0800
----------------------------------------------------------------------
.../apache/spark/scheduler/local/LocalTaskSetManager.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d7bf08cb/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index a498599..f92ad4a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -176,15 +176,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val task = taskSet.tasks(index)
info.markFailed()
decreaseRunningTasks(1)
+ var failureReason = "unknown"
ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) match {
case ef: ExceptionFailure =>
+ failureReason = "Exception failure: %s".format(ef.description)
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Task loss due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
sched.dagScheduler.taskEnded(task, ef, null, null, info, ef.metrics.getOrElse(null))
case TaskResultLost =>
- logWarning("Lost result for TID %s".format(tid))
+ failureReason = "Lost result for TID %s".format(tid)
+ logWarning(failureReason)
sched.dagScheduler.taskEnded(task, TaskResultLost, null, null, info, null)
case _ => {}
@@ -193,8 +196,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
copiesRunning(index) -= 1
numFailures(index) += 1
if (numFailures(index) > MAX_TASK_FAILURES) {
- val errorMessage = "Task %s:%d failed more than %d times; aborting job".format(
- taskSet.id, index, MAX_TASK_FAILURES)
+ val errorMessage = ("Task %s:%d failed more than %d times; aborting job" +
+ "(most recent failure: %s").format(taskSet.id, index, MAX_TASK_FAILURES, failureReason)
decreaseRunningTasks(runningTasks)
sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue