You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:28 UTC
[23/37] git commit: Respect supervise option at Master
Respect supervise option at Master
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7a99702c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7a99702c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7a99702c
Branch: refs/heads/master
Commit: 7a99702ce2fb04c4d76f0ce9f6df6608e0a5cce1
Parents: a872977
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Dec 29 12:12:50 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Dec 29 12:12:58 2013 -0800
----------------------------------------------------------------------
.../org/apache/spark/deploy/master/Master.scala | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7a99702c/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b8655c7..29f20da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -396,8 +396,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
- logWarning(s"Driver ${d.id} was not found after master recovery, re-launching")
- relaunchDriver(d)
+ logWarning(s"Driver ${d.id} was not found after master recovery")
+ if (d.desc.supervise) {
+ logWarning(s"Re-launching ${d.id}")
+ relaunchDriver(d)
+ } else {
+ removeDriver(d.id, DriverState.FAILED, None)
+ logWarning(s"Did not re-launch ${d.id} because it was not supervised")
+ }
}
state = RecoveryState.ALIVE
@@ -519,7 +525,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
- relaunchDriver(driver)
+ if (driver.desc.supervise) {
+ logInfo(s"Re-launching ${driver.id}")
+ relaunchDriver(driver)
+ } else {
+ logInfo(s"Not re-launching ${driver.id} because it was not supervised")
+ removeDriver(driver.id, DriverState.FAILED, None)
+ }
}
persistenceEngine.removeWorker(worker)
}