You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/29 02:48:54 UTC

spark git commit: [SPARK-24377][SPARK SUBMIT] make --py-files work in non pyspark application

Repository: spark
Updated Branches:
  refs/heads/master b31b587cd -> 2ced6193b


[SPARK-24377][SPARK SUBMIT] make --py-files work in non pyspark application

## What changes were proposed in this pull request?

For some Spark applications, though they're a java program, they require not only jar dependencies, but also python dependencies. One example is Livy remote SparkContext application, this application is actually an embedded REPL for Scala/Python/R, it will not only load in jar dependencies, but also python and R deps, so we should specify not only "--jars", but also "--py-files".

Currently for a Spark application, --py-files can only be worked for a pyspark application, so it will not be worked in the above case. So here propose to remove such restriction.

Also we tested that "spark.submit.pyFiles" only supports quite limited scenario (client mode with local deps), so here also expand the usage of "spark.submit.pyFiles" to be alternative of --py-files.

## How was this patch tested?

UT added.

Author: jerryshao <ss...@hortonworks.com>

Closes #21420 from jerryshao/SPARK-24377.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ced6193
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ced6193
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ced6193

Branch: refs/heads/master
Commit: 2ced6193b39dc63e5f74138859f2a9d69d3cfd11
Parents: b31b587
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue May 29 10:48:48 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Tue May 29 10:48:48 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 11 ++---
 .../spark/deploy/SparkSubmitArguments.scala     |  4 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 46 +++++++++++++++++++-
 3 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ced6193/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4baf032..a46af26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -430,18 +430,15 @@ private[spark] class SparkSubmit extends Logging {
         // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
         args.mainClass = "org.apache.spark.deploy.PythonRunner"
         args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
-        if (clusterManager != YARN) {
-          // The YARN backend distributes the primary file differently, so don't merge it.
-          args.files = mergeFileLists(args.files, args.primaryResource)
-        }
       }
       if (clusterManager != YARN) {
         // The YARN backend handles python files differently, so don't merge the lists.
         args.files = mergeFileLists(args.files, args.pyFiles)
       }
-      if (localPyFiles != null) {
-        sparkConf.set("spark.submit.pyFiles", localPyFiles)
-      }
+    }
+
+    if (localPyFiles != null) {
+      sparkConf.set("spark.submit.pyFiles", localPyFiles)
     }
 
     // In YARN mode for an R app, add the SparkR package archive and the R package

http://git-wip-us.apache.org/repos/asf/spark/blob/2ced6193/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index fed4e0a..fb23210 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -182,6 +182,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
     name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
     jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
     files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
+    pyFiles = Option(pyFiles).orElse(sparkProperties.get("spark.submit.pyFiles")).orNull
     ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
     ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
     packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
@@ -280,9 +281,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
       numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
       error("Number of executors must be a positive number")
     }
-    if (pyFiles != null && !isPython) {
-      error("--py-files given but primary resource is not a Python script")
-    }
 
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")

http://git-wip-us.apache.org/repos/asf/spark/blob/2ced6193/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4328695..545c8d0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -771,9 +771,13 @@ class SparkSubmitSuite
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
 
     // Test remote python files
+    val hadoopConf = new Configuration()
+    updateConfWithFakeS3Fs(hadoopConf)
     val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
+    val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
+    val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
     val writer4 = new PrintWriter(f4)
-    val remotePyFiles = "hdfs:///tmp/file1.py,hdfs:///tmp/file2.py"
+    val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
     writer4.println("spark.submit.pyFiles " + remotePyFiles)
     writer4.close()
     val clArgs4 = Seq(
@@ -783,7 +787,7 @@ class SparkSubmitSuite
       "hdfs:///tmp/mister.py"
     )
     val appArgs4 = new SparkSubmitArguments(clArgs4)
-    val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4)
+    val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
     // Should not format python path for yarn cluster mode
     conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
   }
@@ -1093,6 +1097,44 @@ class SparkSubmitSuite
     assert(exception.getMessage() === "hello")
   }
 
+  test("support --py-files/spark.submit.pyFiles in non pyspark application") {
+    val hadoopConf = new Configuration()
+    updateConfWithFakeS3Fs(hadoopConf)
+
+    val tmpDir = Utils.createTempDir()
+    val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+
+    val args = Seq(
+      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+      "--name", "testApp",
+      "--master", "yarn",
+      "--deploy-mode", "client",
+      "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+      "spark-internal"
+    )
+
+    val appArgs = new SparkSubmitArguments(args)
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
+
+    conf.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
+    conf.get("spark.submit.pyFiles") should (startWith("/"))
+
+    // Verify "spark.submit.pyFiles"
+    val args1 = Seq(
+      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+      "--name", "testApp",
+      "--master", "yarn",
+      "--deploy-mode", "client",
+      "--conf", s"spark.submit.pyFiles=s3a://${pyFile.getAbsolutePath}",
+      "spark-internal"
+    )
+
+    val appArgs1 = new SparkSubmitArguments(args1)
+    val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
+
+    conf1.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
+    conf1.get("spark.submit.pyFiles") should (startWith("/"))
+  }
 }
 
 object SparkSubmitSuite extends SparkFunSuite with TimeLimits {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org