You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:08 UTC

[03/37] git commit: Adding better option parsing

Adding better option parsing


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/760823d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/760823d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/760823d3

Branch: refs/heads/master
Commit: 760823d3937822ea4a6d6f476815442711c605fa
Parents: 6a4acc4
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Dec 24 23:20:34 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Dec 25 01:19:01 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  2 +-
 .../apache/spark/deploy/DriverDescription.scala |  6 +-
 .../spark/deploy/client/DriverClient.scala      | 35 ++++---
 .../deploy/client/DriverClientArguments.scala   | 97 ++++++++++++++++++++
 .../org/apache/spark/deploy/master/Master.scala |  3 +-
 .../spark/deploy/worker/DriverRunner.scala      | 25 +++--
 .../org/apache/spark/deploy/worker/Worker.scala |  6 +-
 .../spark/deploy/worker/ui/IndexPage.scala      |  6 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  4 +
 .../spark/examples/DriverSubmissionTest.scala   | 45 +++++++++
 10 files changed, 187 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 6743526..332c7e8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -84,7 +84,7 @@ private[deploy] object DeployMessages {
       sparkHome: String)
     extends DeployMessage
 
-  case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int)
+  case class LaunchDriver(driverId: String, driverDesc: DriverDescription)
     extends DeployMessage
 
   case class KillDriver(driverId: String) extends DeployMessage

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 52f6b1b..32ff6db 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -20,7 +20,11 @@ package org.apache.spark.deploy
 private[spark] class DriverDescription(
     val jarUrl: String,
     val mainClass: String,
-    val mem: Integer) // TODO: Should this be Long?
+    val mem: Int,
+    val cores: Int,
+    val options: Seq[String],
+    val javaOptions: Seq[String],
+    val envVars: Seq[(String, String)])
   extends Serializable {
 
   override def toString: String = s"DriverDescription ($mainClass)"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 482bafd..dd62172 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -34,7 +34,7 @@ import scala.concurrent.Await
 import akka.actor.Actor.emptyBehavior
 
 /**
- * Parent class for actors that to send a single message to the standalone master and then die.
+ * Actor that sends a single message to the standalone master and then shuts down.
  */
 private[spark] abstract class SingleMessageClient(
     actorSystem: ActorSystem, master: String, message: DeployMessage)
@@ -94,34 +94,31 @@ private[spark] class TerminationClient(actorSystem: ActorSystem, master: String,
 }
 
 /**
- * Callable utility for starting and terminating drivers inside of the standalone scheduler.
+ * Executable utility for starting and terminating drivers inside of a standalone cluster.
  */
 object DriverClient {
 
   def main(args: Array[String]) {
-    if (args.size < 3) {
-      println("usage: DriverClient launch <active-master> <jar-url> <main-class>")
-      println("usage: DriverClient kill <active-master> <driver-id>")
-      System.exit(-1)
-    }
+    val driverArgs = new DriverClientArguments(args)
 
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
-      "driverSubmission", Utils.localHostName(), 0)
-
-    // TODO Should be configurable
-    val mem = 512
+      "driverClient", Utils.localHostName(), 0)
 
-    args(0) match {
+    driverArgs.cmd match {
       case "launch" =>
-        val master = args(1)
-        val jarUrl = args(2)
-        val mainClass = args(3)
-        val driverDescription = new DriverDescription(jarUrl, mainClass, mem)
-        val client = new SubmissionClient(actorSystem, master, driverDescription)
+        val driverDescription = new DriverDescription(
+          driverArgs.jarUrl,
+          driverArgs.mainClass,
+          driverArgs.memory,
+          driverArgs.cores,
+          driverArgs.driverOptions,
+          driverArgs.driverJavaOptions,
+          driverArgs.driverEnvVars)
+        val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription)
 
       case "kill" =>
-        val master = args(1)
-        val driverId = args(2)
+        val master = driverArgs.master
+        val driverId = driverArgs.driverId
         val client = new TerminationClient(actorSystem, master, driverId)
     }
     actorSystem.awaitTermination()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
new file mode 100644
index 0000000..618467c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Command-line parser for the driver client.
+ */
+private[spark] class DriverClientArguments(args: Array[String]) {
+  var cmd: String = "" // 'launch' or 'kill'
+
+  // launch parameters
+  var master: String = ""
+  var jarUrl: String = ""
+  var mainClass: String = ""
+  var memory: Int = 512
+  var cores: Int = 1
+  private var _driverOptions = ListBuffer[String]()
+  private var _driverJavaOptions = ListBuffer[String]()
+  private var _driverEnvVars = ListBuffer[(String, String)]()
+  def driverOptions = _driverOptions.toSeq
+  def driverJavaOptions = _driverJavaOptions.toSeq
+  def driverEnvVars = _driverEnvVars.toSeq
+
+  // kill parameters
+  var driverId: String = ""
+  
+  parse(args.toList)
+
+  def parse(args: List[String]): Unit = args match {
+    case ("--cores" | "-c") :: value :: tail =>
+      cores = value.toInt
+      parse(tail)
+
+    case ("--memory" | "-m") :: value :: tail =>
+      memory = value.toInt
+      parse(tail)
+
+    case ("--java-option" | "-j") :: value :: tail =>
+      _driverJavaOptions += value
+      parse(tail)
+
+    case ("--environment-variable" | "-e") :: value :: tail =>
+      val parts = value.split("=")
+      _driverEnvVars += ((parts(0), parts(1)))
+
+    case ("--help" | "-h") :: tail =>
+      printUsageAndExit(0)
+
+    case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
+      cmd = "launch"
+      master = _master
+      jarUrl = _jarUrl
+      mainClass = _mainClass
+      _driverOptions ++= tail
+
+    case "kill" :: _master :: _driverId :: tail =>
+      cmd = "kill"
+      master = _master
+      driverId = _driverId
+
+    case _ =>
+      printUsageAndExit(1)
+  }
+
+  /**
+   * Print usage and exit JVM with the given exit code.
+   */
+  def printUsageAndExit(exitCode: Int) {
+    System.err.println(
+      "usage: DriverClient launch [options] <active-master> <jar-url> <main-class> " +
+        "[driver options]\n" +
+      "usage: DriverClient kill <active-master> <driver-id>\n\n" +
+      "Options:\n" +
+      "  -c CORES, --cores CORES                Number of cores to request \n" +
+      "  -m MEMORY, --memory MEMORY             Megabytes of memory to request\n" +
+      "  -j JAVA_OPT, --java-option JAVA_OPT    Java option to pass to driver\n" +
+      "  -e K=V, --environment-variable K=V     Environment variable to pass to driver\n")
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 76af332..9bfacfc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -627,8 +627,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     logInfo("Launching driver " + driver.id + " on worker " + worker.id)
     worker.addDriver(driver)
     driver.worker = Some(worker)
-    worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass,
-      driver.desc.mem)
+    worker.actor ! LaunchDriver(driver.id, driver.desc)
     driver.state = DriverState.RUNNING
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index fccc36b..41a089a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -29,16 +29,15 @@ import org.apache.hadoop.conf.Configuration
 import akka.actor.{ActorRef, ActorSelection}
 import org.apache.spark.deploy.DeployMessages.DriverStateChanged
 import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.DriverDescription
 
 /**
  * Manages the execution of one driver process.
  */
 private[spark] class DriverRunner(
     val driverId: String,
-    val jarUrl: String,
-    val mainClass: String,
     val workDir: File,
-    val memory: Int,
+    val driverDesc: DriverDescription,
     val worker: ActorRef)
   extends Logging {
 
@@ -54,8 +53,9 @@ private[spark] class DriverRunner(
         try {
           val driverDir = createWorkingDirectory()
           val localJarFilename = downloadUserJar(driverDir)
-          val command = Seq("java", "-cp", localJarFilename, mainClass)
-          runCommandWithRetry(command, driverDir)
+          val command = Seq("java") ++ driverDesc.javaOptions ++ Seq("-cp", localJarFilename) ++
+            Seq(driverDesc.mainClass) ++ driverDesc.options
+          runCommandWithRetry(command, driverDesc.envVars, driverDir)
         }
         catch {
           case e: Exception => exn = Some(e)
@@ -110,7 +110,7 @@ private[spark] class DriverRunner(
    */
   def downloadUserJar(driverDir: File): String = {
 
-    val jarPath = new Path(jarUrl)
+    val jarPath = new Path(driverDesc.jarUrl)
 
     val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
     val jarFileSystem = jarPath.getFileSystem(emptyConf)
@@ -134,17 +134,17 @@ private[spark] class DriverRunner(
   }
 
   /** Continue launching the supplied command until it exits zero. */
-  def runCommandWithRetry(command: Seq[String], baseDir: File) = {
-    /* Time to wait between submission retries. */
+  def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = {
+    // Time to wait between submission retries.
     var waitSeconds = 1
-    // TODO: We should distinguish between "immediate" exits and cases where it was running
-    //       for a long time and then exits.
     var cleanExit = false
 
     while (!cleanExit && !killed) {
       Thread.sleep(waitSeconds * 1000)
+
+      logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\""))
       val builder = new ProcessBuilder(command: _*).directory(baseDir)
-      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+      envVars.map{ case(k,v) => builder.environment().put(k, v) }
 
       process = Some(builder.start())
 
@@ -153,12 +153,11 @@ private[spark] class DriverRunner(
       redirectStream(process.get.getInputStream, stdout)
 
       val stderr = new File(baseDir, "stderr")
-      val header = "Driver Command: %s\n%s\n\n".format(
+      val header = "Launch Command: %s\n%s\n\n".format(
         command.mkString("\"", "\" \"", "\""), "=" * 40)
       Files.write(header, stderr, Charsets.UTF_8)
       redirectStream(process.get.getErrorStream, stderr)
 
-
       val exitCode =
         /* There is a race here I've elected to ignore for now because it's very unlikely and not
          * simple to fix. This could see `killed=false` then the main thread gets a kill request

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a2b491a..dd6783a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -242,9 +242,9 @@ private[spark] class Worker(
         }
       }
 
-    case LaunchDriver(driverId, jarUrl, mainClass, memory) => {
+    case LaunchDriver(driverId, driverDesc) => {
       logInfo(s"Asked to launch driver $driverId")
-      val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self)
+      val driver = new DriverRunner(driverId, workDir, driverDesc, self)
       drivers(driverId) = driver
       driver.start()
 
@@ -278,7 +278,7 @@ private[spark] class Worker(
         master ! DriverStateChanged(driverId, state, exception)
       }
       val driver = drivers(driverId)
-      memoryUsed -= driver.memory
+      memoryUsed -= driver.driverDesc.mem
       coresUsed -= 1
       drivers -= driverId
       finishedDrivers(driverId) = driver

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index e233b82..2c37b71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -137,9 +137,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
   def driverRow(driver: DriverRunner): Seq[Node] = {
     <tr>
       <td>{driver.driverId}</td>
-      <td>{driver.mainClass}</td>
-      <td sorttable_customkey={driver.memory.toString}>
-        {Utils.megabytesToString(driver.memory)}
+      <td>{driver.driverDesc.mainClass}</td>
+      <td sorttable_customkey={driver.driverDesc.mem.toString}>
+        {Utils.megabytesToString(driver.driverDesc.mem)}
       </td>
       <td>
         <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index d128e58..2fd862c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -82,6 +82,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
         s"${workDir.getPath}/$appId/$executorId/$logType"
       case (None, None, Some(d)) =>
         s"${workDir.getPath}/$driverId/$logType"
+      case _ =>
+        throw new Exception("Request must specify either application or driver identifiers")
     }
 
     val (startByte, endByte) = getByteRange(path, offset, byteLength)
@@ -106,6 +108,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
         (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
       case (None, None, Some(d)) =>
         (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+      case _ =>
+        throw new Exception("Request must specify either application or driver identifiers")
     }
 
     val (startByte, endByte) = getByteRange(path, offset, byteLength)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/760823d3/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
new file mode 100644
index 0000000..9055ce7
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import scala.collection.JavaConversions._
+
+/** Prints out environmental information, sleeps, and then exits. Made to
+  * test driver submission in the standalone scheduler. */
+object DriverSubmissionTest {
+  def main(args: Array[String]) {
+    if (args.size < 1) {
+      println("Usage: DriverSubmissionTest <seconds-to-sleep>")
+      System.exit(0)
+    }
+    val numSecondsToSleep = args(0).toInt
+
+    val env = System.getenv()
+    val properties = System.getProperties()
+
+    println("Environment variables containing SPARK_TEST:")
+    env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)
+
+    println("System properties containing spark.test:")
+    properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println)
+
+    for (i <- 1 until numSecondsToSleep) {
+      Thread.sleep(1000)
+    }
+  }
+}