You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:15 UTC

[10/37] git commit: Refactor DriverClient to be more Actor-based

Refactor DriverClient to be more Actor-based


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/61372b11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/61372b11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/61372b11

Branch: refs/heads/master
Commit: 61372b11f4a4460b8ade8997d7478234bba64f7e
Parents: bbc3628
Author: Aaron Davidson <aa...@databricks.com>
Authored: Wed Dec 25 10:55:09 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Wed Dec 25 10:55:25 2013 -0800

----------------------------------------------------------------------
 .../spark/deploy/client/DriverClient.scala      | 93 +++++++-------------
 1 file changed, 31 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61372b11/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 28c851b..d2f3c09 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -17,78 +17,35 @@
 
 package org.apache.spark.deploy.client
 
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.Await
-import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent._
 
 import akka.actor._
-import akka.actor.Actor.emptyBehavior
-import akka.pattern.ask
-import akka.remote.RemotingLifecycleEvent
 
 import org.apache.spark.Logging
-import org.apache.spark.deploy.{DeployMessage, DriverDescription}
+import org.apache.spark.deploy.DriverDescription
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
- * Actor that sends a single message to the standalone master and then shuts down.
+ * Actor that sends a single message to the standalone master and returns the response in the
+ * given promise.
  */
-private[spark] abstract class SingleMessageClient(
-    actorSystem: ActorSystem, master: String, message: DeployMessage)
-  extends Logging {
-
-  // Concrete child classes must implement
-  def handleResponse(response: Any)
-
-  var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor()))
-
-  class DriverActor extends Actor with Logging {
-    override def preStart() {
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-      logInfo("Sending message to master " + master + "...")
-      val masterActor = context.actorSelection(Master.toAkkaUrl(master))
-      val timeoutDuration: FiniteDuration = Duration.create(
-        System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
-      val submitFuture = masterActor.ask(message)(timeoutDuration)
-      handleResponse(Await.result(submitFuture, timeoutDuration))
-      actorSystem.stop(actor)
-      actorSystem.shutdown()
+class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
+  override def receive = {
+    case SubmitDriverResponse(success, message) => {
+      response.success((success, message))
     }
 
-    override def receive = emptyBehavior
-  }
-}
-
-/**
- * Submits a driver to the master.
- */
-private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String,
-    driverDescription: DriverDescription)
-    extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) {
-
-  override def handleResponse(response: Any) {
-    val resp = response.asInstanceOf[SubmitDriverResponse]
-    if (!resp.success) {
-      logError(s"Error submitting driver to $master")
-      logError(resp.message)
+    case KillDriverResponse(success, message) => {
+      response.success((success, message))
     }
-  }
-}
 
-/**
- * Terminates a client at the master.
- */
-private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String)
-    extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) {
-
-  override def handleResponse(response: Any) {
-    val resp = response.asInstanceOf[KillDriverResponse]
-    if (!resp.success) {
-      logError(s"Error terminating $driverId at $master")
-      logError(resp.message)
+    // Relay all other messages to the server.
+    case message => {
+      logInfo(s"Sending message to master $master...")
+      val masterActor = context.actorSelection(Master.toAkkaUrl(master))
+      masterActor ! message
     }
   }
 }
@@ -96,7 +53,7 @@ private[spark] class TerminationClient(actorSystem: ActorSystem, master: String,
 /**
  * Executable utility for starting and terminating drivers inside of a standalone cluster.
  */
-object DriverClient {
+object DriverClient extends Logging {
 
   def main(args: Array[String]) {
     val driverArgs = new DriverClientArguments(args)
@@ -105,6 +62,9 @@ object DriverClient {
     //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
       "driverClient", Utils.localHostName(), 0)
+    val master = driverArgs.master
+    val response = promise[(Boolean, String)]
+    val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
 
     driverArgs.cmd match {
       case "launch" =>
@@ -116,13 +76,22 @@ object DriverClient {
           driverArgs.driverOptions,
           driverArgs.driverJavaOptions,
           driverArgs.driverEnvVars)
-        val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription)
+        driver ! RequestSubmitDriver(driverDescription)
 
       case "kill" =>
-        val master = driverArgs.master
         val driverId = driverArgs.driverId
-        val client = new TerminationClient(actorSystem, master, driverId)
+        driver ! RequestKillDriver(driverId)
     }
+
+    val (success, message) =
+      try {
+        Await.result(response.future, AkkaUtils.askTimeout)
+      } catch {
+        case e: TimeoutException => (false, s"Master $master failed to respond in time")
+      }
+    if (success) logInfo(message) else logError(message)
+    actorSystem.stop(driver)
+    actorSystem.shutdown()
     actorSystem.awaitTermination()
   }
 }