You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/30 08:52:15 UTC

git commit: [SPARK-2260] Fix standalone-cluster mode, which was broken

Repository: spark
Updated Branches:
  refs/heads/master 077f633b4 -> 4ce92ccaf


[SPARK-2260] Fix standalone-cluster mode, which was broken

The main thing was that spark configs were not propagated to the driver, and so applications that do not specify `master` or `appName` automatically failed. This PR fixes that and a couple of miscellaneous things that are related.

One thing that may or may not be an issue is that the jars must be available on the driver node. In `standalone-cluster` mode, this effectively means these jars must be available on all the worker machines, since the driver is launched on one of them. The semantics here are not the same as `yarn-cluster` mode,  where all the relevant jars are uploaded to a distributed cache automatically and shipped to the containers. This is probably not a concern, but still worth a mention.

Author: Andrew Or <an...@gmail.com>

Closes #1538 from andrewor14/standalone-cluster and squashes the following commits:

8c11a0d [Andrew Or] Clean up imports / comments (minor)
2678d13 [Andrew Or] Handle extraJavaOpts properly
7660547 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster
6f64a9b [Andrew Or] Revert changes in YARN
2f2908b [Andrew Or] Fix tests
ed01491 [Andrew Or] Don't go overboard with escaping
8e105e1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster
b890949 [Andrew Or] Abstract usages of converting spark opts to java opts
79f63a3 [Andrew Or] Move sparkProps into javaOpts
78752f8 [Andrew Or] Fix tests
5a9c6c7 [Andrew Or] Fix line too long
c141a00 [Andrew Or] Don't display "unknown app" on driver log pages
d7e2728 [Andrew Or] Avoid deprecation warning in standalone Client
6ceb14f [Andrew Or] Allow relevant configs to propagate to standalone Driver
7f854bc [Andrew Or] Fix test
855256e [Andrew Or] Fix standalone-cluster mode
fd9da51 [Andrew Or] Formatting changes (minor)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ce92cca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ce92cca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ce92cca

Branch: refs/heads/master
Commit: 4ce92ccaf761e48a10fc4fe4927dbfca858ca22b
Parents: 077f633
Author: Andrew Or <an...@gmail.com>
Authored: Tue Jul 29 23:52:09 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jul 29 23:52:09 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 22 +++++++++++++++++++-
 .../scala/org/apache/spark/deploy/Client.scala  | 21 ++++++++++---------
 .../scala/org/apache/spark/deploy/Command.scala |  2 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 12 +++++------
 .../apache/spark/deploy/client/TestClient.scala |  6 +++---
 .../spark/deploy/worker/CommandUtils.scala      |  7 +++----
 .../spark/deploy/worker/DriverRunner.scala      |  3 ++-
 .../spark/deploy/worker/ExecutorRunner.scala    | 14 ++++++++-----
 .../apache/spark/deploy/worker/ui/LogPage.scala | 11 +++++-----
 .../executor/CoarseGrainedExecutorBackend.scala |  9 ++++++--
 .../cluster/SparkDeploySchedulerBackend.scala   | 11 ++++++----
 .../scala/org/apache/spark/util/Utils.scala     |  9 ++++++++
 .../apache/spark/deploy/JsonProtocolSuite.scala |  6 +++---
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  7 +++++--
 .../spark/deploy/worker/DriverRunnerTest.scala  |  2 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |  2 +-
 16 files changed, 93 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8ce4b91..3870084 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap
  */
 class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
+  import SparkConf._
+
   /** Create a SparkConf that loads defaults from system properties and the classpath */
   def this() = this(true)
 
@@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
      *
      *   E.g. spark.akka.option.x.y.x = "value"
      */
-    getAll.filter {case (k, v) => k.startsWith("akka.")}
+    getAll.filter { case (k, _) => isAkkaConf(k) }
 
   /** Does the configuration contain a given parameter? */
   def contains(key: String): Boolean = settings.contains(key)
@@ -292,3 +294,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
   }
 }
+
+private[spark] object SparkConf {
+  /**
+   * Return whether the given config is an akka config (e.g. akka.actor.provider).
+   * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
+   */
+  def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
+
+  /**
+   * Return whether the given config should be passed to an executor on start-up.
+   *
+   * Certain akka and authentication configs are required of the executor when it connects to
+   * the scheduler, while the rest of the spark configs can be inherited from the driver later.
+   */
+  def isExecutorStartupConf(name: String): Boolean = {
+    isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c371dc3..17c507a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.deploy
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
 import scala.concurrent._
 
 import akka.actor._
@@ -50,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
         // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
         //       truncate filesystem paths similar to what YARN does. For now, we just require
         //       people call `addJar` assuming the jar is in the same directory.
-        val env = Map[String, String]()
-        System.getenv().foreach{case (k, v) => env(k) = v}
-
         val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
 
         val classPathConf = "spark.driver.extraClassPath"
@@ -65,10 +60,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
           cp.split(java.io.File.pathSeparator)
         }
 
-        val javaOptionsConf = "spark.driver.extraJavaOptions"
-        val javaOpts = sys.props.get(javaOptionsConf)
+        val extraJavaOptsConf = "spark.driver.extraJavaOptions"
+        val extraJavaOpts = sys.props.get(extraJavaOptsConf)
+          .map(Utils.splitCommandString).getOrElse(Seq.empty)
+        val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+        val javaOpts = sparkJavaOpts ++ extraJavaOpts
         val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
-          driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
+          driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
 
         val driverDescription = new DriverDescription(
           driverArgs.jarUrl,
@@ -109,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
         // Exception, if present
         statusResponse.exception.map { e =>
           println(s"Exception from cluster was: $e")
+          e.printStackTrace()
           System.exit(-1)
         }
         System.exit(0)
@@ -141,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
  */
 object Client {
   def main(args: Array[String]) {
-    println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
-    println("Use ./bin/spark-submit with \"--master spark://host:port\"")
+    if (!sys.props.contains("SPARK_SUBMIT")) {
+      println("WARNING: This client is deprecated and will be removed in a future version of Spark")
+      println("Use ./bin/spark-submit with \"--master spark://host:port\"")
+    }
 
     val conf = new SparkConf()
     val driverArgs = new ClientArguments(args)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/Command.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
index 32f3ba3..a2b2635 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Command.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -25,5 +25,5 @@ private[spark] case class Command(
     environment: Map[String, String],
     classPathEntries: Seq[String],
     libraryPathEntries: Seq[String],
-    extraJavaOptions: Option[String] = None) {
+    javaOpts: Seq[String]) {
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c9cec33..3df811c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -136,8 +136,6 @@ object SparkSubmit {
     (clusterManager, deployMode) match {
       case (MESOS, CLUSTER) =>
         printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
-      case (STANDALONE, CLUSTER) =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
       case (_, CLUSTER) if args.isPython =>
         printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
       case (_, CLUSTER) if isShell(args.primaryResource) =>
@@ -170,9 +168,9 @@ object SparkSubmit {
     val options = List[OptionAssigner](
 
       // All cluster managers
-      OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
-      OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
-      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
+      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
+      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
+      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
 
       // Standalone cluster only
       OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
@@ -203,9 +201,9 @@ object SparkSubmit {
         sysProp = "spark.driver.extraJavaOptions"),
       OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraLibraryPath"),
-      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
+      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
         sysProp = "spark.executor.memory"),
-      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
+      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
         sysProp = "spark.cores.max"),
       OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
         sysProp = "spark.files")

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index e15a87b..b8ffa9a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -46,11 +46,11 @@ private[spark] object TestClient {
   def main(args: Array[String]) {
     val url = args(0)
     val conf = new SparkConf
-    val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+    val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
       conf = conf, securityManager = new SecurityManager(conf))
     val desc = new ApplicationDescription(
-      "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
-        Seq()), Some("dummy-spark-home"), "ignored")
+      "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
+        Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
     val listener = new TestListener
     val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
     client.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 4af5bc3..687e492 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -47,7 +47,6 @@ object CommandUtils extends Logging {
    */
   def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
     val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
-    val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
 
     // Exists for backwards compatibility with older Spark versions
     val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
         val joined = command.libraryPathEntries.mkString(File.pathSeparator)
         Seq(s"-Djava.library.path=$joined")
       } else {
-         Seq()
+        Seq()
       }
 
     val permGenOpt = Seq("-XX:MaxPermSize=128m")
@@ -71,11 +70,11 @@ object CommandUtils extends Logging {
     val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
     val classPath = Utils.executeAndGetOutput(
       Seq(sparkHome + "/bin/compute-classpath" + ext),
-      extraEnvironment=command.environment)
+      extraEnvironment = command.environment)
     val userClassPath = command.classPathEntries ++ Seq(classPath)
 
     Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
-      permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
+      permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
   }
 
   /** Spawn a thread that will redirect a given stream to a file */

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 662d378..5caaf6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
 
 /**
  * Manages the execution of one driver, including automatically restarting the driver on failure.
+ * This is currently only used in standalone cluster deploy mode.
  */
 private[spark] class DriverRunner(
     val driverId: String,
@@ -81,7 +82,7 @@ private[spark] class DriverRunner(
             driverDesc.command.environment,
             classPath,
             driverDesc.command.libraryPathEntries,
-            driverDesc.command.extraJavaOptions)
+            driverDesc.command.javaOpts)
           val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
             sparkHome.getAbsolutePath)
           launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 467317d..7be89f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender
 
 /**
  * Manages the execution of one executor process.
+ * This is currently only used in standalone mode.
  */
 private[spark] class ExecutorRunner(
     val appId: String,
@@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
   }
 
   /**
-   * kill executor process, wait for exit and notify worker to update resource status
+   * Kill executor process, wait for exit and notify worker to update resource status.
    *
    * @param message the exception message which caused the executor's death 
    */
@@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
   }
 
   def getCommandSeq = {
-    val command = Command(appDesc.command.mainClass,
-      appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
-      appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
-      appDesc.command.extraJavaOptions)
+    val command = Command(
+      appDesc.command.mainClass,
+      appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+      appDesc.command.environment,
+      appDesc.command.classPathEntries,
+      appDesc.command.libraryPathEntries,
+      appDesc.command.javaOpts)
     CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index b389cb5..ecb358c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.worker.ui
 
-import java.io.File
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
@@ -25,7 +24,7 @@ import scala.xml.Node
 import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.Utils
 import org.apache.spark.Logging
-import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
+import org.apache.spark.util.logging.RollingFileAppender
 
 private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
   private val worker = parent.worker
@@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
     val offset = Option(request.getParameter("offset")).map(_.toLong)
     val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
 
-    val (logDir, params) = (appId, executorId, driverId) match {
+    val (logDir, params, pageName) = (appId, executorId, driverId) match {
       case (Some(a), Some(e), None) =>
-        (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
+        (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
       case (None, None, Some(d)) =>
-        (s"${workDir.getPath}/$d/", s"driverId=$d")
+        (s"${workDir.getPath}/$d/", s"driverId=$d", d)
       case _ =>
         throw new Exception("Request must specify either application or driver identifiers")
     }
@@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
           </div>
         </body>
       </html>
-    UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app"))
+    UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
   }
 
   /** Get the part of the log files given the offset and desired length of bytes */

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b455c9f..860b47e 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend(
 }
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
-  def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
-    workerUrl: Option[String]) {
+
+  private def run(
+      driverUrl: String,
+      executorId: String,
+      hostname: String,
+      cores: Int,
+      workerUrl: Option[String]) {
 
     SignalLogger.register(log)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bf2dc88..48aaaa5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -46,6 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
     val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+      .map(Utils.splitCommandString).getOrElse(Seq.empty)
     val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
       cp.split(java.io.File.pathSeparator)
     }
@@ -54,9 +55,11 @@ private[spark] class SparkDeploySchedulerBackend(
         cp.split(java.io.File.pathSeparator)
       }
 
-    val command = Command(
-      "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
-      classPathEntries, libraryPathEntries, extraJavaOpts)
+    // Start executors with a few necessary configs for registering with the scheduler
+    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
+    val javaOpts = sparkJavaOpts ++ extraJavaOpts
+    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
     val sparkHome = sc.getSparkHome()
     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
       sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8cbb905..69f65b4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1313,4 +1313,13 @@ private[spark] object Utils extends Logging {
     s"$className: $desc\n$st"
   }
 
+  /**
+   * Convert all spark properties set in the given SparkConf to a sequence of java options.
+   */
+  def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = {
+    conf.getAll
+      .filter { case (k, _) => filterKey(k) }
+      .map { case (k, v) => s"-D$k=$v" }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 01ab2d5..093394a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   def createAppDesc(): ApplicationDescription = {
-    val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
+    val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
     new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
   }
 
@@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createDriverCommand() = new Command(
     "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
-    Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
+    Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
   )
 
   def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
@@ -170,7 +170,7 @@ object JsonConstants {
     """
       |{"name":"name","cores":4,"memoryperslave":1234,
       |"user":"%s","sparkhome":"sparkHome",
-      |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
+      |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
     """.format(System.getProperty("user.name", "<unknown>")).stripMargin
 
   val executorRunnerJsonStr =

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index f497a5e..a301cbd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -200,9 +200,12 @@ class SparkSubmitSuite extends FunSuite with Matchers {
     childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
     mainClass should be ("org.apache.spark.deploy.Client")
     classpath should have size (0)
-    sysProps should have size (3)
-    sysProps.keys should contain ("spark.jars")
+    sysProps should have size (5)
     sysProps.keys should contain ("SPARK_SUBMIT")
+    sysProps.keys should contain ("spark.master")
+    sysProps.keys should contain ("spark.app.name")
+    sysProps.keys should contain ("spark.jars")
+    sysProps.keys should contain ("spark.shuffle.spill")
     sysProps("spark.shuffle.spill") should be ("false")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index 4633bc3..c930839 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}
 
 class DriverRunnerTest extends FunSuite {
   private def createDriverRunner() = {
-    val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
+    val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
     val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
     new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
       null, "akka://1.2.3.4/worker/")

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce92cca/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index e5f748d..ca4d987 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite {
     def f(s:String) = new File(s)
     val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
-      Command("foo", Seq(), Map(), Seq(), Seq()),
+      Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
       sparkHome, "appUiUrl")
     val appId = "12345-worker321-9876"
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),