You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:35 UTC

[30/37] git commit: Rename to Client

Rename to Client


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d0533f70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d0533f70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d0533f70

Branch: refs/heads/master
Commit: d0533f704681adccc8fe2b814dc9e5082646057a
Parents: 3d939e5
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 7 23:38:46 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 7 23:38:51 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  | 114 ++++++++++++++++++
 .../apache/spark/deploy/ClientArguments.scala   | 105 +++++++++++++++++
 .../spark/deploy/client/DriverClient.scala      | 117 -------------------
 .../deploy/client/DriverClientArguments.scala   | 105 -----------------
 4 files changed, 219 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
new file mode 100644
index 0000000..0475bb1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.Map
+import scala.concurrent._
+
+import akka.actor._
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * Actor that sends a single message to the standalone master and returns the response in the
+ * given promise.
+ */
+class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
+  override def receive = {
+    case SubmitDriverResponse(success, message) => {
+      response.success((success, message))
+    }
+
+    case KillDriverResponse(success, message) => {
+      response.success((success, message))
+    }
+
+    // Relay all other messages to the master.
+    case message => {
+      logInfo(s"Sending message to master $master...")
+      val masterActor = context.actorSelection(Master.toAkkaUrl(master))
+      masterActor ! message
+    }
+  }
+}
+
+/**
+ * Executable utility for starting and terminating drivers inside of a standalone cluster.
+ */
+object DriverClient {
+
+  def main(args: Array[String]) {
+    val driverArgs = new ClientArguments(args)
+    val conf = new SparkConf()
+
+    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
+      conf.set("spark.akka.logLifecycleEvents", "true")
+    }
+    conf.set("spark.akka.askTimeout", "5")
+    Logger.getRootLogger.setLevel(driverArgs.logLevel)
+
+    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
+    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
+    val (actorSystem, _) = AkkaUtils.createActorSystem(
+      "driverClient", Utils.localHostName(), 0, false, conf)
+    val master = driverArgs.master
+    val response = promise[(Boolean, String)]
+    val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
+
+    println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
+    driverArgs.cmd match {
+      case "launch" =>
+        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
+        //       truncate filesystem paths similar to what YARN does. For now, we just require
+        //       people call `addJar` assuming the jar is in the same directory.
+        val env = Map[String, String]()
+        System.getenv().foreach{case (k, v) => env(k) = v}
+
+        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+        val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
+          driverArgs.driverOptions, env)
+
+        val driverDescription = new DriverDescription(
+          driverArgs.jarUrl,
+          driverArgs.memory,
+          driverArgs.cores,
+          driverArgs.supervise,
+          command)
+        driver ! RequestSubmitDriver(driverDescription)
+
+      case "kill" =>
+        val driverId = driverArgs.driverId
+        driver ! RequestKillDriver(driverId)
+    }
+
+    val (success, message) =
+      try {
+        Await.result(response.future, AkkaUtils.askTimeout(conf))
+      } catch {
+        case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
+      }
+    println(message)
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
new file mode 100644
index 0000000..50b92e1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.log4j.Level
+
+/**
+ * Command-line parser for the driver client.
+ */
+private[spark] class ClientArguments(args: Array[String]) {
+  val defaultCores = 1
+  val defaultMemory = 512
+
+  var cmd: String = "" // 'launch' or 'kill'
+  var logLevel = Level.WARN
+
+  // launch parameters
+  var master: String = ""
+  var jarUrl: String = ""
+  var mainClass: String = ""
+  var supervise: Boolean = false
+  var memory: Int = defaultMemory
+  var cores: Int = defaultCores
+  private var _driverOptions = ListBuffer[String]()
+  def driverOptions = _driverOptions.toSeq
+
+  // kill parameters
+  var driverId: String = ""
+  
+  parse(args.toList)
+
+  def parse(args: List[String]): Unit = args match {
+    case ("--cores" | "-c") :: value :: tail =>
+      cores = value.toInt
+      parse(tail)
+
+    case ("--memory" | "-m") :: value :: tail =>
+      memory = value.toInt
+      parse(tail)
+
+    case ("--supervise" | "-s") :: tail =>
+      supervise = true
+      parse(tail)
+
+    case ("--help" | "-h") :: tail =>
+      printUsageAndExit(0)
+
+    case ("--verbose" | "-v") :: tail =>
+      logLevel = Level.INFO
+      parse(tail)
+
+    case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
+      cmd = "launch"
+      master = _master
+      jarUrl = _jarUrl
+      mainClass = _mainClass
+      _driverOptions ++= tail
+
+    case "kill" :: _master :: _driverId :: tail =>
+      cmd = "kill"
+      master = _master
+      driverId = _driverId
+
+    case _ =>
+      printUsageAndExit(1)
+  }
+
+  /**
+   * Print usage and exit JVM with the given exit code.
+   */
+  def printUsageAndExit(exitCode: Int) {
+    // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
+    //       separately similar to in the YARN client.
+    val usage =
+      s"""
+        |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
+        |Usage: DriverClient kill <active-master> <driver-id>
+        |
+        |Options:
+        |   -c CORES, --cores CORES        Number of cores to request (default: $defaultCores)
+        |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $defaultMemory)
+        |   -s, --supervise                Whether to restart the driver on failure
+        |   -v, --verbose                  Print more debugging output
+      """.stripMargin
+    System.err.println(usage)
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
deleted file mode 100644
index 8b066ba..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.client
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
-import scala.concurrent._
-
-import akka.actor._
-import akka.actor.Actor
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.{Command, DriverDescription}
-import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{AkkaUtils, Utils}
-import org.apache.log4j.{Logger, Level}
-import akka.remote.RemotingLifecycleEvent
-
-/**
- * Actor that sends a single message to the standalone master and returns the response in the
- * given promise.
- */
-class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
-  override def receive = {
-    case SubmitDriverResponse(success, message) => {
-      response.success((success, message))
-    }
-
-    case KillDriverResponse(success, message) => {
-      response.success((success, message))
-    }
-
-    // Relay all other messages to the master.
-    case message => {
-      logInfo(s"Sending message to master $master...")
-      val masterActor = context.actorSelection(Master.toAkkaUrl(master))
-      masterActor ! message
-    }
-  }
-}
-
-/**
- * Executable utility for starting and terminating drivers inside of a standalone cluster.
- */
-object DriverClient {
-
-  def main(args: Array[String]) {
-    val driverArgs = new DriverClientArguments(args)
-    val conf = new SparkConf()
-
-    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
-      conf.set("spark.akka.logLifecycleEvents", "true")
-    }
-    conf.set("spark.akka.askTimeout", "5")
-    Logger.getRootLogger.setLevel(driverArgs.logLevel)
-
-    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
-    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
-    val (actorSystem, _) = AkkaUtils.createActorSystem(
-      "driverClient", Utils.localHostName(), 0, false, conf)
-    val master = driverArgs.master
-    val response = promise[(Boolean, String)]
-    val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
-
-    println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
-    driverArgs.cmd match {
-      case "launch" =>
-        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
-        //       truncate filesystem paths similar to what YARN does. For now, we just require
-        //       people call `addJar` assuming the jar is in the same directory.
-        val env = Map[String, String]()
-        System.getenv().foreach{case (k, v) => env(k) = v}
-
-        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
-        val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
-          driverArgs.driverOptions, env)
-
-        val driverDescription = new DriverDescription(
-          driverArgs.jarUrl,
-          driverArgs.memory,
-          driverArgs.cores,
-          driverArgs.supervise,
-          command)
-        driver ! RequestSubmitDriver(driverDescription)
-
-      case "kill" =>
-        val driverId = driverArgs.driverId
-        driver ! RequestKillDriver(driverId)
-    }
-
-    val (success, message) =
-      try {
-        Await.result(response.future, AkkaUtils.askTimeout(conf))
-      } catch {
-        case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
-      }
-    println(message)
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
deleted file mode 100644
index 7774a56..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.client
-
-import scala.collection.mutable.ListBuffer
-
-import org.apache.log4j.Level
-
-/**
- * Command-line parser for the driver client.
- */
-private[spark] class DriverClientArguments(args: Array[String]) {
-  val defaultCores = 1
-  val defaultMemory = 512
-
-  var cmd: String = "" // 'launch' or 'kill'
-  var logLevel = Level.WARN
-
-  // launch parameters
-  var master: String = ""
-  var jarUrl: String = ""
-  var mainClass: String = ""
-  var supervise: Boolean = false
-  var memory: Int = defaultMemory
-  var cores: Int = defaultCores
-  private var _driverOptions = ListBuffer[String]()
-  def driverOptions = _driverOptions.toSeq
-
-  // kill parameters
-  var driverId: String = ""
-  
-  parse(args.toList)
-
-  def parse(args: List[String]): Unit = args match {
-    case ("--cores" | "-c") :: value :: tail =>
-      cores = value.toInt
-      parse(tail)
-
-    case ("--memory" | "-m") :: value :: tail =>
-      memory = value.toInt
-      parse(tail)
-
-    case ("--supervise" | "-s") :: tail =>
-      supervise = true
-      parse(tail)
-
-    case ("--help" | "-h") :: tail =>
-      printUsageAndExit(0)
-
-    case ("--verbose" | "-v") :: tail =>
-      logLevel = Level.INFO
-      parse(tail)
-
-    case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
-      cmd = "launch"
-      master = _master
-      jarUrl = _jarUrl
-      mainClass = _mainClass
-      _driverOptions ++= tail
-
-    case "kill" :: _master :: _driverId :: tail =>
-      cmd = "kill"
-      master = _master
-      driverId = _driverId
-
-    case _ =>
-      printUsageAndExit(1)
-  }
-
-  /**
-   * Print usage and exit JVM with the given exit code.
-   */
-  def printUsageAndExit(exitCode: Int) {
-    // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
-    //       separately similar to in the YARN client.
-    val usage =
-      s"""
-        |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
-        |Usage: DriverClient kill <active-master> <driver-id>
-        |
-        |Options:
-        |   -c CORES, --cores CORES        Number of cores to request (default: $defaultCores)
-        |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $defaultMemory)
-        |   -s, --supervise                Whether to restart the driver on failure
-        |   -v, --verbose                  Print more debugging output
-      """.stripMargin
-    System.err.println(usage)
-    System.exit(exitCode)
-  }
-}