You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:40 UTC

[35/37] git commit: Adding polling to driver submission client.

Adding polling to driver submission client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0f9d2ace
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0f9d2ace
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0f9d2ace

Branch: refs/heads/master
Commit: 0f9d2ace6baefeacb1abf9d51a457644b67f2f8d
Parents: 62b08fa
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Jan 8 16:53:04 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jan 8 16:56:26 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  | 141 ++++++++++++-------
 .../org/apache/spark/deploy/DeployMessage.scala |  11 +-
 .../spark/deploy/master/DriverState.scala       |   5 +-
 .../org/apache/spark/deploy/master/Master.scala |  29 ++--
 .../spark/deploy/worker/DriverRunner.scala      |  12 +-
 .../org/apache/spark/deploy/worker/Worker.scala |   2 +-
 6 files changed, 132 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f9d2ace/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 43b9b1c..e133893 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -22,60 +22,30 @@ import scala.collection.mutable.Map
 import scala.concurrent._
 
 import akka.actor._
+import akka.pattern.ask
 import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.util.{AkkaUtils, Utils}
+import akka.actor.Actor.emptyBehavior
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
 
 /**
- * Actor that sends a single message to the standalone master and returns the response in the
- * given promise.
+ * Proxy that relays messages to the driver.
  */
-class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
-  override def receive = {
-    case SubmitDriverResponse(success, message) => {
-      response.success((success, message))
-    }
-
-    case KillDriverResponse(success, message) => {
-      response.success((success, message))
-    }
+class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
+  var masterActor: ActorSelection = _
+  val timeout = AkkaUtils.askTimeout(conf)
 
-    // Relay all other messages to the master.
-    case message => {
-      logInfo(s"Sending message to master $master...")
-      val masterActor = context.actorSelection(Master.toAkkaUrl(master))
-      masterActor ! message
-    }
-  }
-}
+  override def preStart() = {
+    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
 
-/**
- * Executable utility for starting and terminating drivers inside of a standalone cluster.
- */
-object Client {
-
-  def main(args: Array[String]) {
-    val driverArgs = new ClientArguments(args)
-    val conf = new SparkConf()
-
-    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
-      conf.set("spark.akka.logLifecycleEvents", "true")
-    }
-    conf.set("spark.akka.askTimeout", "5")
-    Logger.getRootLogger.setLevel(driverArgs.logLevel)
-
-    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
-    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
-    val (actorSystem, _) = AkkaUtils.createActorSystem(
-      "driverClient", Utils.localHostName(), 0, false, conf)
-    val master = driverArgs.master
-    val response = promise[(Boolean, String)]
-    val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
     println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
+
     driverArgs.cmd match {
       case "launch" =>
         // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
@@ -94,21 +64,88 @@ object Client {
           driverArgs.cores,
           driverArgs.supervise,
           command)
-        driver ! RequestSubmitDriver(driverDescription)
+
+        masterActor ! RequestSubmitDriver(driverDescription)
 
       case "kill" =>
         val driverId = driverArgs.driverId
-        driver ! RequestKillDriver(driverId)
+        val killFuture = masterActor ! RequestKillDriver(driverId)
+    }
+  }
+
+  /* Find out driver status then exit the JVM */
+  def pollAndReportStatus(driverId: String) {
+    println(s"... waiting before polling master for driver state")
+    Thread.sleep(5000)
+    println("... polling master for driver state")
+    val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
+      .mapTo[DriverStatusResponse]
+    val statusResponse = Await.result(statusFuture, timeout)
+
+    statusResponse.found match {
+      case false =>
+        println(s"ERROR: Cluster master did not recognize $driverId")
+        System.exit(-1)
+      case true =>
+        println(s"State of $driverId is ${statusResponse.state.get}")
+        // Worker node, if present
+        (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
+          case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
+            println(s"Driver running on $hostPort ($id)")
+          case _ =>
+        }
+        // Exception, if present
+        statusResponse.exception.map { e =>
+          println(s"Exception from cluster was: $e")
+          System.exit(-1)
+        }
+        System.exit(0)
     }
+  }
+
+  override def receive = {
+
+    case SubmitDriverResponse(success, driverId, message) =>
+      println(message)
+      if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
+
+    case KillDriverResponse(driverId, success, message) =>
+      println(message)
+      if (success) pollAndReportStatus(driverId) else System.exit(-1)
+
+    case DisassociatedEvent(_, remoteAddress, _) =>
+      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
+      System.exit(-1)
+
+    case AssociationErrorEvent(cause, _, remoteAddress, _) =>
+      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
+      println(s"Cause was: $cause")
+      System.exit(-1)
+  }
+}
+
+/**
+ * Executable utility for starting and terminating drivers inside of a standalone cluster.
+ */
+object Client {
+  def main(args: Array[String]) {
+    val conf = new SparkConf()
+    val driverArgs = new ClientArguments(args)
+
+    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
+      conf.set("spark.akka.logLifecycleEvents", "true")
+    }
+    conf.set("spark.akka.askTimeout", "10")
+    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
+    Logger.getRootLogger.setLevel(driverArgs.logLevel)
+
+    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
+    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
+    val (actorSystem, _) = AkkaUtils.createActorSystem(
+      "driverClient", Utils.localHostName(), 0, false, conf)
+
+    actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
-    val (success, message) =
-      try {
-        Await.result(response.future, AkkaUtils.askTimeout(conf))
-      } catch {
-        case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
-      }
-    println(message)
-    actorSystem.shutdown()
     actorSystem.awaitTermination()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f9d2ace/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 34460d3..5e824e1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -112,11 +112,18 @@ private[deploy] object DeployMessages {
 
   case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
 
-  case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage
+  case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String)
+    extends DeployMessage
 
   case class RequestKillDriver(driverId: String) extends DeployMessage
 
-  case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage
+  case class KillDriverResponse(driverId: String, success: Boolean, message: String)
+    extends DeployMessage
+
+  case class RequestDriverStatus(driverId: String) extends DeployMessage
+
+  case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
+    workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])
 
   // Internal message in AppClient
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f9d2ace/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
index 93b2607..26a68ba 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
@@ -27,6 +27,7 @@ private[spark] object DriverState extends Enumeration {
   // RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again
   // UNKNOWN: The state of the driver is temporarily not known due to master failure recovery
   // KILLED: A user manually killed this driver
-  // FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file)
-  val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value
+  // FAILED: The driver exited non-zero and was not supervised
+  // ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file)
+  val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR = Value
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f9d2ace/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f62601f..cd3f3eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -186,7 +186,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     case RequestSubmitDriver(description) => {
       if (state != RecoveryState.ALIVE) {
         val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
-        sender ! SubmitDriverResponse(false, msg)
+        sender ! SubmitDriverResponse(false, None, msg)
       } else {
         logInfo("Driver submitted " + description.command.mainClass)
         val driver = createDriver(description)
@@ -198,14 +198,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         // TODO: It might be good to instead have the submission client poll the master to determine
         //       the current status of the driver. For now it's simply "fire and forget".
 
-        sender ! SubmitDriverResponse(true, s"Driver successfully submitted as ${driver.id}")
+        sender ! SubmitDriverResponse(true, Some(driver.id),
+          s"Driver successfully submitted as ${driver.id}")
       }
     }
 
     case RequestKillDriver(driverId) => {
       if (state != RecoveryState.ALIVE) {
         val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
-        sender ! KillDriverResponse(false, msg)
+        sender ! KillDriverResponse(driverId, success = false, msg)
       } else {
         logInfo("Asked to kill driver " + driverId)
         val driver = drivers.find(_.id == driverId)
@@ -226,15 +227,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
             // TODO: It would be nice for this to be a synchronous response
             val msg = s"Kill request for $driverId submitted"
             logInfo(msg)
-            sender ! KillDriverResponse(true, msg)
+            sender ! KillDriverResponse(driverId, success = true, msg)
           case None =>
-            val msg = s"Could not find running driver $driverId"
+            val msg = s"Driver $driverId has already finished or does not exist"
             logWarning(msg)
-            sender ! KillDriverResponse(false, msg)
+            sender ! KillDriverResponse(driverId, success = false, msg)
         }
       }
     }
 
+    case RequestDriverStatus(driverId) => {
+      (drivers ++ completedDrivers).find(_.id == driverId) match {
+        case Some(driver) =>
+          sender ! DriverStatusResponse(found = true, Some(driver.state),
+            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
+        case None =>
+          sender ! DriverStatusResponse(found = false, None, None, None, None)
+      }
+    }
+
     case RegisterApplication(description) => {
       if (state == RecoveryState.STANDBY) {
         // ignore, don't send response
@@ -279,7 +290,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
     case DriverStateChanged(driverId, state, exception) => {
       state match {
-        case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
+        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
           removeDriver(driverId, state, exception)
         case _ =>
           throw new Exception(s"Received unexpected state update for driver $driverId: $state")
@@ -410,7 +421,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         logWarning(s"Re-launching ${d.id}")
         relaunchDriver(d)
       } else {
-        removeDriver(d.id, DriverState.FAILED, None)
+        removeDriver(d.id, DriverState.ERROR, None)
         logWarning(s"Did not re-launch ${d.id} because it was not supervised")
       }
     }
@@ -539,7 +550,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         relaunchDriver(driver)
       } else {
         logInfo(s"Not re-launching ${driver.id} because it was not supervised")
-        removeDriver(driver.id, DriverState.FAILED, None)
+        removeDriver(driver.id, DriverState.ERROR, None)
       }
     }
     persistenceEngine.removeWorker(worker)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f9d2ace/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index ad70345..b4df1a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -52,6 +52,7 @@ private[spark] class DriverRunner(
   // Populated once finished
   var finalState: Option[DriverState] = None
   var finalException: Option[Exception] = None
+  var finalExitCode: Option[Int] = None
 
   // Decoupled for testing
   private[deploy] def setClock(_clock: Clock) = clock = _clock
@@ -87,8 +88,14 @@ private[spark] class DriverRunner(
 
         val state =
           if (killed) { DriverState.KILLED }
-          else if (finalException.isDefined) { DriverState.FAILED }
-          else { DriverState.FINISHED }
+          else if (finalException.isDefined) { DriverState.ERROR }
+          else {
+            finalExitCode match {
+              case Some(0) => DriverState.FINISHED
+              case _ => DriverState.FAILED
+            }
+          }
+
         finalState = Some(state)
 
         worker ! DriverStateChanged(driverId, state, finalException)
@@ -200,6 +207,7 @@ private[spark] class DriverRunner(
       }
 
       keepTrying = supervise && exitCode != 0 && !killed
+      finalExitCode = Some(exitCode)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f9d2ace/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2a2b7a3..273bacd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -272,7 +272,7 @@ private[spark] class Worker(
 
     case DriverStateChanged(driverId, state, exception) => {
       state match {
-        case DriverState.FAILED =>
+        case DriverState.ERROR =>
           logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
         case DriverState.FINISHED =>
           logInfo(s"Driver $driverId exited successfully")