You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:28 UTC

[23/37] git commit: Respect supervise option at Master

Respect supervise option at Master


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7a99702c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7a99702c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7a99702c

Branch: refs/heads/master
Commit: 7a99702ce2fb04c4d76f0ce9f6df6608e0a5cce1
Parents: a872977
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Dec 29 12:12:50 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Dec 29 12:12:58 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala   | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7a99702c/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b8655c7..29f20da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -396,8 +396,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
     // Reschedule drivers which were not claimed by any workers
     drivers.filter(_.worker.isEmpty).foreach { d =>
-      logWarning(s"Driver ${d.id} was not found after master recovery, re-launching")
-      relaunchDriver(d)
+      logWarning(s"Driver ${d.id} was not found after master recovery")
+      if (d.desc.supervise) {
+        logWarning(s"Re-launching ${d.id}")
+        relaunchDriver(d)
+      } else {
+        removeDriver(d.id, DriverState.FAILED, None)
+        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
+      }
     }
 
     state = RecoveryState.ALIVE
@@ -519,7 +525,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       exec.application.removeExecutor(exec)
     }
     for (driver <- worker.drivers.values) {
-      relaunchDriver(driver)
+      if (driver.desc.supervise) {
+        logInfo(s"Re-launching ${driver.id}")
+        relaunchDriver(driver)
+      } else {
+        logInfo(s"Not re-launching ${driver.id} because it was not supervised")
+        removeDriver(driver.id, DriverState.FAILED, None)
+      }
     }
     persistenceEngine.removeWorker(worker)
   }