You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/01 21:47:36 UTC

git commit: Merge pull request #219 from sundeepn/schedulerexception

Updated Branches:
  refs/heads/branch-0.8 be9c176a8 -> d21266e97


Merge pull request #219 from sundeepn/schedulerexception

Scheduler quits when newStage fails

The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.

(cherry picked from commit 740922f25d5f81617fbe02c7bcd1610d6426bbef)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d21266e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d21266e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d21266e9

Branch: refs/heads/branch-0.8
Commit: d21266e9710d7b72218508d050fe6e9fc903944c
Parents: be9c176
Author: Reynold Xin <rx...@apache.org>
Authored: Sun Dec 1 12:46:58 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Dec 1 12:47:30 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d21266e9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0a34a06..4a54267 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -368,7 +368,17 @@ class DAGScheduler(
   private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
     event match {
       case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
-        val finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
+        var finalStage: Stage = null
+        try {
+          // New stage creation at times and if its not protected, the scheduler thread is killed. 
+          // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
+          finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
+        } catch {
+          case e: Exception =>
+            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
+            listener.jobFailed(e)
+            return false
+        }
         val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
         clearCacheLocs()
         logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +