You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:35 UTC
[30/37] git commit: Rename to Client
Rename to Client
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d0533f70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d0533f70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d0533f70
Branch: refs/heads/master
Commit: d0533f704681adccc8fe2b814dc9e5082646057a
Parents: 3d939e5
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 7 23:38:46 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 7 23:38:51 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/deploy/Client.scala | 114 ++++++++++++++++++
.../apache/spark/deploy/ClientArguments.scala | 105 +++++++++++++++++
.../spark/deploy/client/DriverClient.scala | 117 -------------------
.../deploy/client/DriverClientArguments.scala | 105 -----------------
4 files changed, 219 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
new file mode 100644
index 0000000..0475bb1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.Map
+import scala.concurrent._
+
+import akka.actor._
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * Actor that sends a single message to the standalone master and returns the response in the
+ * given promise.
+ */
+class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
+ override def receive = {
+ case SubmitDriverResponse(success, message) => {
+ response.success((success, message))
+ }
+
+ case KillDriverResponse(success, message) => {
+ response.success((success, message))
+ }
+
+ // Relay all other messages to the master.
+ case message => {
+ logInfo(s"Sending message to master $master...")
+ val masterActor = context.actorSelection(Master.toAkkaUrl(master))
+ masterActor ! message
+ }
+ }
+}
+
+/**
+ * Executable utility for starting and terminating drivers inside of a standalone cluster.
+ */
+object DriverClient {
+
+ def main(args: Array[String]) {
+ val driverArgs = new ClientArguments(args)
+ val conf = new SparkConf()
+
+ if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
+ conf.set("spark.akka.logLifecycleEvents", "true")
+ }
+ conf.set("spark.akka.askTimeout", "5")
+ Logger.getRootLogger.setLevel(driverArgs.logLevel)
+
+ // TODO: See if we can initialize akka so return messages are sent back using the same TCP
+ // flow. Else, this (sadly) requires the DriverClient be routable from the Master.
+ val (actorSystem, _) = AkkaUtils.createActorSystem(
+ "driverClient", Utils.localHostName(), 0, false, conf)
+ val master = driverArgs.master
+ val response = promise[(Boolean, String)]
+ val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
+
+ println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
+ driverArgs.cmd match {
+ case "launch" =>
+ // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
+ // truncate filesystem paths similar to what YARN does. For now, we just require
+ // people call `addJar` assuming the jar is in the same directory.
+ val env = Map[String, String]()
+ System.getenv().foreach{case (k, v) => env(k) = v}
+
+ val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+ val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
+ driverArgs.driverOptions, env)
+
+ val driverDescription = new DriverDescription(
+ driverArgs.jarUrl,
+ driverArgs.memory,
+ driverArgs.cores,
+ driverArgs.supervise,
+ command)
+ driver ! RequestSubmitDriver(driverDescription)
+
+ case "kill" =>
+ val driverId = driverArgs.driverId
+ driver ! RequestKillDriver(driverId)
+ }
+
+ val (success, message) =
+ try {
+ Await.result(response.future, AkkaUtils.askTimeout(conf))
+ } catch {
+ case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
+ }
+ println(message)
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
new file mode 100644
index 0000000..50b92e1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.log4j.Level
+
+/**
+ * Command-line parser for the driver client.
+ */
+private[spark] class ClientArguments(args: Array[String]) {
+ val defaultCores = 1
+ val defaultMemory = 512
+
+ var cmd: String = "" // 'launch' or 'kill'
+ var logLevel = Level.WARN
+
+ // launch parameters
+ var master: String = ""
+ var jarUrl: String = ""
+ var mainClass: String = ""
+ var supervise: Boolean = false
+ var memory: Int = defaultMemory
+ var cores: Int = defaultCores
+ private var _driverOptions = ListBuffer[String]()
+ def driverOptions = _driverOptions.toSeq
+
+ // kill parameters
+ var driverId: String = ""
+
+ parse(args.toList)
+
+ def parse(args: List[String]): Unit = args match {
+ case ("--cores" | "-c") :: value :: tail =>
+ cores = value.toInt
+ parse(tail)
+
+ case ("--memory" | "-m") :: value :: tail =>
+ memory = value.toInt
+ parse(tail)
+
+ case ("--supervise" | "-s") :: tail =>
+ supervise = true
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case ("--verbose" | "-v") :: tail =>
+ logLevel = Level.INFO
+ parse(tail)
+
+ case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
+ cmd = "launch"
+ master = _master
+ jarUrl = _jarUrl
+ mainClass = _mainClass
+ _driverOptions ++= tail
+
+ case "kill" :: _master :: _driverId :: tail =>
+ cmd = "kill"
+ master = _master
+ driverId = _driverId
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+
+ /**
+ * Print usage and exit JVM with the given exit code.
+ */
+ def printUsageAndExit(exitCode: Int) {
+ // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
+ // separately similar to in the YARN client.
+ val usage =
+ s"""
+ |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
+ |Usage: DriverClient kill <active-master> <driver-id>
+ |
+ |Options:
+ | -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
+ | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
+ | -s, --supervise Whether to restart the driver on failure
+ | -v, --verbose Print more debugging output
+ """.stripMargin
+ System.err.println(usage)
+ System.exit(exitCode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
deleted file mode 100644
index 8b066ba..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.client
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
-import scala.concurrent._
-
-import akka.actor._
-import akka.actor.Actor
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.{Command, DriverDescription}
-import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{AkkaUtils, Utils}
-import org.apache.log4j.{Logger, Level}
-import akka.remote.RemotingLifecycleEvent
-
-/**
- * Actor that sends a single message to the standalone master and returns the response in the
- * given promise.
- */
-class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
- override def receive = {
- case SubmitDriverResponse(success, message) => {
- response.success((success, message))
- }
-
- case KillDriverResponse(success, message) => {
- response.success((success, message))
- }
-
- // Relay all other messages to the master.
- case message => {
- logInfo(s"Sending message to master $master...")
- val masterActor = context.actorSelection(Master.toAkkaUrl(master))
- masterActor ! message
- }
- }
-}
-
-/**
- * Executable utility for starting and terminating drivers inside of a standalone cluster.
- */
-object DriverClient {
-
- def main(args: Array[String]) {
- val driverArgs = new DriverClientArguments(args)
- val conf = new SparkConf()
-
- if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
- conf.set("spark.akka.logLifecycleEvents", "true")
- }
- conf.set("spark.akka.askTimeout", "5")
- Logger.getRootLogger.setLevel(driverArgs.logLevel)
-
- // TODO: See if we can initialize akka so return messages are sent back using the same TCP
- // flow. Else, this (sadly) requires the DriverClient be routable from the Master.
- val (actorSystem, _) = AkkaUtils.createActorSystem(
- "driverClient", Utils.localHostName(), 0, false, conf)
- val master = driverArgs.master
- val response = promise[(Boolean, String)]
- val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
-
- println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
- driverArgs.cmd match {
- case "launch" =>
- // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
- // truncate filesystem paths similar to what YARN does. For now, we just require
- // people call `addJar` assuming the jar is in the same directory.
- val env = Map[String, String]()
- System.getenv().foreach{case (k, v) => env(k) = v}
-
- val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
- val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, env)
-
- val driverDescription = new DriverDescription(
- driverArgs.jarUrl,
- driverArgs.memory,
- driverArgs.cores,
- driverArgs.supervise,
- command)
- driver ! RequestSubmitDriver(driverDescription)
-
- case "kill" =>
- val driverId = driverArgs.driverId
- driver ! RequestKillDriver(driverId)
- }
-
- val (success, message) =
- try {
- Await.result(response.future, AkkaUtils.askTimeout(conf))
- } catch {
- case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
- }
- println(message)
- actorSystem.shutdown()
- actorSystem.awaitTermination()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0533f70/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
deleted file mode 100644
index 7774a56..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.client
-
-import scala.collection.mutable.ListBuffer
-
-import org.apache.log4j.Level
-
-/**
- * Command-line parser for the driver client.
- */
-private[spark] class DriverClientArguments(args: Array[String]) {
- val defaultCores = 1
- val defaultMemory = 512
-
- var cmd: String = "" // 'launch' or 'kill'
- var logLevel = Level.WARN
-
- // launch parameters
- var master: String = ""
- var jarUrl: String = ""
- var mainClass: String = ""
- var supervise: Boolean = false
- var memory: Int = defaultMemory
- var cores: Int = defaultCores
- private var _driverOptions = ListBuffer[String]()
- def driverOptions = _driverOptions.toSeq
-
- // kill parameters
- var driverId: String = ""
-
- parse(args.toList)
-
- def parse(args: List[String]): Unit = args match {
- case ("--cores" | "-c") :: value :: tail =>
- cores = value.toInt
- parse(tail)
-
- case ("--memory" | "-m") :: value :: tail =>
- memory = value.toInt
- parse(tail)
-
- case ("--supervise" | "-s") :: tail =>
- supervise = true
- parse(tail)
-
- case ("--help" | "-h") :: tail =>
- printUsageAndExit(0)
-
- case ("--verbose" | "-v") :: tail =>
- logLevel = Level.INFO
- parse(tail)
-
- case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
- cmd = "launch"
- master = _master
- jarUrl = _jarUrl
- mainClass = _mainClass
- _driverOptions ++= tail
-
- case "kill" :: _master :: _driverId :: tail =>
- cmd = "kill"
- master = _master
- driverId = _driverId
-
- case _ =>
- printUsageAndExit(1)
- }
-
- /**
- * Print usage and exit JVM with the given exit code.
- */
- def printUsageAndExit(exitCode: Int) {
- // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
- // separately similar to in the YARN client.
- val usage =
- s"""
- |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
- |Usage: DriverClient kill <active-master> <driver-id>
- |
- |Options:
- | -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
- | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
- | -s, --supervise Whether to restart the driver on failure
- | -v, --verbose Print more debugging output
- """.stripMargin
- System.err.println(usage)
- System.exit(exitCode)
- }
-}