You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/05 00:21:36 UTC

spark git commit: [SPARK-9669] [MESOS] Support PySpark on Mesos cluster mode.

Repository: spark
Updated Branches:
  refs/heads/master 3339e6f67 -> b087d23e2


[SPARK-9669] [MESOS] Support PySpark on Mesos cluster mode.

Support running pyspark with cluster mode on Mesos!
This doesn't upload any scripts, so if running in a remote Mesos requires the user to specify the script from a available URI.

Author: Timothy Chen <tn...@gmail.com>

Closes #8349 from tnachen/mesos_python.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b087d23e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b087d23e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b087d23e

Branch: refs/heads/master
Commit: b087d23e28004f1bfdf6d2cd3ff34ae58c8132df
Parents: 3339e6f
Author: Timothy Chen <tn...@gmail.com>
Authored: Fri Sep 4 15:21:11 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Sep 4 15:21:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 13 ++++--
 .../cluster/mesos/MesosClusterScheduler.scala   | 42 ++++++++++++++------
 docs/running-on-mesos.md                        |  2 +
 3 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b087d23e/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 36e9750..ad92f56 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -319,9 +319,6 @@ object SparkSubmit {
 
     // The following modes are not supported or applicable
     (clusterManager, deployMode) match {
-      case (MESOS, CLUSTER) if args.isPython =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for python " +
-          "applications on Mesos clusters.")
       case (MESOS, CLUSTER) if args.isR =>
         printErrorAndExit("Cluster deploy mode is currently not supported for R " +
           "applications on Mesos clusters.")
@@ -554,7 +551,15 @@ object SparkSubmit {
     if (isMesosCluster) {
       assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
       childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
-      childArgs += (args.primaryResource, args.mainClass)
+      if (args.isPython) {
+        // Second argument is main class
+        childArgs += (args.primaryResource, "")
+        if (args.pyFiles != null) {
+          sysProps("spark.submit.pyFiles") = args.pyFiles
+        }
+      } else {
+        childArgs += (args.primaryResource, args.mainClass)
+      }
       if (args.childArgs != null) {
         childArgs ++= args.childArgs
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/b087d23e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 07da924..a6d9374 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -29,7 +29,6 @@ import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.{Scheduler, SchedulerDriver}
-
 import org.apache.spark.deploy.mesos.MesosDriverDescription
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
 import org.apache.spark.metrics.MetricsSystem
@@ -375,21 +374,20 @@ private[spark] class MesosClusterScheduler(
     val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
     envBuilder.addVariables(
       Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
-    val cmdOptions = generateCmdOption(desc).mkString(" ")
     val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
     val executorUri = desc.schedulerProperties.get("spark.executor.uri")
       .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
-    val appArguments = desc.command.arguments.mkString(" ")
-    val (executable, jar) = if (dockerDefined) {
+    // Gets the path to run spark-submit, and the path to the Mesos sandbox.
+    val (executable, sandboxPath) = if (dockerDefined) {
       // Application jar is automatically downloaded in the mounted sandbox by Mesos,
       // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
-      ("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
+      ("./bin/spark-submit", "$MESOS_SANDBOX")
     } else if (executorUri.isDefined) {
       builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
       val folderBasename = executorUri.get.split('/').last.split('.').head
       val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
-      val cmdJar = s"../${desc.jarUrl.split("/").last}"
-      (cmdExecutable, cmdJar)
+      // Sandbox path points to the parent folder as we chdir into the folderBasename.
+      (cmdExecutable, "..")
     } else {
       val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
         .orElse(conf.getOption("spark.home"))
@@ -398,30 +396,50 @@ private[spark] class MesosClusterScheduler(
           throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
         }
       val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
-      val cmdJar = desc.jarUrl.split("/").last
-      (cmdExecutable, cmdJar)
+      // Sandbox points to the current directory by default with Mesos.
+      (cmdExecutable, ".")
     }
-    builder.setValue(s"$executable $cmdOptions $jar $appArguments")
+    val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
+    val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
+    val appArguments = desc.command.arguments.mkString(" ")
+    builder.setValue(s"$executable $cmdOptions $primaryResource $appArguments")
     builder.setEnvironment(envBuilder.build())
     conf.getOption("spark.mesos.uris").map { uris =>
       setupUris(uris, builder)
     }
+    desc.schedulerProperties.get("spark.mesos.uris").map { uris =>
+      setupUris(uris, builder)
+    }
+    desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+      setupUris(pyFiles, builder)
+    }
     builder.build()
   }
 
-  private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
+  private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
     var options = Seq(
       "--name", desc.schedulerProperties("spark.app.name"),
-      "--class", desc.command.mainClass,
       "--master", s"mesos://${conf.get("spark.master")}",
       "--driver-cores", desc.cores.toString,
       "--driver-memory", s"${desc.mem}M")
+
+    // Assume empty main class means we're running python
+    if (!desc.command.mainClass.equals("")) {
+      options ++= Seq("--class", desc.command.mainClass)
+    }
+
     desc.schedulerProperties.get("spark.executor.memory").map { v =>
       options ++= Seq("--executor-memory", v)
     }
     desc.schedulerProperties.get("spark.cores.max").map { v =>
       options ++= Seq("--total-executor-cores", v)
     }
+    desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+      val formattedFiles = pyFiles.split(",")
+        .map { path => new File(sandboxPath, path.split("/").last).toString() }
+        .mkString(",")
+      options ++= Seq("--py-files", formattedFiles)
+    }
     options
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b087d23e/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index f36921a..247e6ec 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -157,6 +157,8 @@ From the client, you can submit a job to Mesos cluster by running `spark-submit`
 to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
 Spark cluster Web UI.
 
+Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves.
+
 # Mesos Run Modes
 
 Spark can run over Mesos in two modes: "fine-grained" (default) and "coarse-grained".


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org