You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/30 23:29:11 UTC

git commit: [SPARK-3319] [SPARK-3338] Resolve Spark submit config paths

Repository: spark
Updated Branches:
  refs/heads/master 9142c9b80 -> 24c512925


[SPARK-3319] [SPARK-3338] Resolve Spark submit config paths

The bulk of this PR is comprised of tests. All changes in functionality are made in `SparkSubmit.scala` (~20 lines).

**SPARK-3319.** There is currently a divergence in behavior when the user passes in additional jars through `--jars` and through setting `spark.jars` in the default properties file. The former will happily resolve the paths (e.g. convert `my.jar` to `file:/absolute/path/to/my.jar`), while the latter does not. We should resolve paths consistently in both cases. This also applies to the following pairs of command line arguments and Spark configs:

- `--jars` ~ `spark.jars`
- `--files` ~ `spark.files` / `spark.yarn.dist.files`
- `--archives` ~ `spark.yarn.dist.archives`
- `--py-files` ~ `spark.submit.pyFiles`

**SPARK-3338.** This PR also fixes the following bug: if the user sets `spark.submit.pyFiles` in his/her properties file, it does not actually get picked up even if `--py-files` is not set. This is simply because the config is overridden by an empty string.

Author: Andrew Or <an...@gmail.com>
Author: Andrew Or <an...@databricks.com>

Closes #2232 from andrewor14/resolve-config-paths and squashes the following commits:

fff2869 [Andrew Or] Add spark.yarn.jar
da3a1c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths
f0fae64 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths
05e03d6 [Andrew Or] Add tests for resolving both command line and config paths
460117e [Andrew Or] Resolve config paths properly
fe039d3 [Andrew Or] Beef up tests to test fixed-pointed-ness of Utils.resolveURI(s)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24c51292
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24c51292
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24c51292

Branch: refs/heads/master
Commit: 24c5129257ce6e3b734f168e860b714c2730b55f
Parents: 9142c9b
Author: Andrew Or <an...@gmail.com>
Authored: Thu Oct 30 15:29:07 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Oct 30 15:29:07 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |  28 ++++-
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 106 ++++++++++++++++++-
 .../org/apache/spark/util/UtilsSuite.scala      |  38 +++++--
 3 files changed, 158 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index f97bf67..0379ade 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,8 +158,9 @@ object SparkSubmit {
         args.files = mergeFileLists(args.files, args.primaryResource)
       }
       args.files = mergeFileLists(args.files, args.pyFiles)
-      // Format python file paths properly before adding them to the PYTHONPATH
-      sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
+      if (args.pyFiles != null) {
+        sysProps("spark.submit.pyFiles") = args.pyFiles
+      }
     }
 
     // Special flag to avoid deprecation warnings at the client
@@ -284,6 +285,29 @@ object SparkSubmit {
       sysProps.getOrElseUpdate(k, v)
     }
 
+    // Resolve paths in certain spark properties
+    val pathConfigs = Seq(
+      "spark.jars",
+      "spark.files",
+      "spark.yarn.jar",
+      "spark.yarn.dist.files",
+      "spark.yarn.dist.archives")
+    pathConfigs.foreach { config =>
+      // Replace old URIs with resolved URIs, if they exist
+      sysProps.get(config).foreach { oldValue =>
+        sysProps(config) = Utils.resolveURIs(oldValue)
+      }
+    }
+
+    // Resolve and format python file paths properly before adding them to the PYTHONPATH.
+    // The resolving part is redundant in the case of --py-files, but necessary if the user
+    // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
+    sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
+      val resolvedPyFiles = Utils.resolveURIs(pyFiles)
+      val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+      sysProps("spark.submit.pyFiles") = formattedPyFiles
+    }
+
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1cdf50d..d8cd0ff 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
     runSparkSubmit(args)
   }
 
-  test("spark submit includes jars passed in through --jar") {
+  test("includes jars passed in through --jars") {
     val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
     val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
     val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
@@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers {
     runSparkSubmit(args)
   }
 
+  test("resolves command line argument paths correctly") {
+    val jars = "/jar1,/jar2"                 // --jars
+    val files = "hdfs:/file1,file2"          // --files
+    val archives = "file:/archive1,archive2" // --archives
+    val pyFiles = "py-file1,py-file2"        // --py-files
+
+    // Test jars and files
+    val clArgs = Seq(
+      "--master", "local",
+      "--class", "org.SomeClass",
+      "--jars", jars,
+      "--files", files,
+      "thejar.jar")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+    appArgs.jars should be (Utils.resolveURIs(jars))
+    appArgs.files should be (Utils.resolveURIs(files))
+    sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
+    sysProps("spark.files") should be (Utils.resolveURIs(files))
+
+    // Test files and archives (Yarn)
+    val clArgs2 = Seq(
+      "--master", "yarn-client",
+      "--class", "org.SomeClass",
+      "--files", files,
+      "--archives", archives,
+      "thejar.jar"
+    )
+    val appArgs2 = new SparkSubmitArguments(clArgs2)
+    val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+    appArgs2.files should be (Utils.resolveURIs(files))
+    appArgs2.archives should be (Utils.resolveURIs(archives))
+    sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
+    sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
+
+    // Test python files
+    val clArgs3 = Seq(
+      "--master", "local",
+      "--py-files", pyFiles,
+      "mister.py"
+    )
+    val appArgs3 = new SparkSubmitArguments(clArgs3)
+    val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+    appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
+    sysProps3("spark.submit.pyFiles") should be (
+      PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+  }
+
+  test("resolves config paths correctly") {
+    val jars = "/jar1,/jar2" // spark.jars
+    val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
+    val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
+    val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
+
+    // Test jars and files
+    val f1 = File.createTempFile("test-submit-jars-files", "")
+    val writer1 = new PrintWriter(f1)
+    writer1.println("spark.jars " + jars)
+    writer1.println("spark.files " + files)
+    writer1.close()
+    val clArgs = Seq(
+      "--master", "local",
+      "--class", "org.SomeClass",
+      "--properties-file", f1.getPath,
+      "thejar.jar"
+    )
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+    sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+    sysProps("spark.files") should be(Utils.resolveURIs(files))
+
+    // Test files and archives (Yarn)
+    val f2 = File.createTempFile("test-submit-files-archives", "")
+    val writer2 = new PrintWriter(f2)
+    writer2.println("spark.yarn.dist.files " + files)
+    writer2.println("spark.yarn.dist.archives " + archives)
+    writer2.close()
+    val clArgs2 = Seq(
+      "--master", "yarn-client",
+      "--class", "org.SomeClass",
+      "--properties-file", f2.getPath,
+      "thejar.jar"
+    )
+    val appArgs2 = new SparkSubmitArguments(clArgs2)
+    val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+    sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+    sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
+
+    // Test python files
+    val f3 = File.createTempFile("test-submit-python-files", "")
+    val writer3 = new PrintWriter(f3)
+    writer3.println("spark.submit.pyFiles " + pyFiles)
+    writer3.close()
+    val clArgs3 = Seq(
+      "--master", "local",
+      "--properties-file", f3.getPath,
+      "mister.py"
+    )
+    val appArgs3 = new SparkSubmitArguments(clArgs3)
+    val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+    sysProps3("spark.submit.pyFiles") should be(
+      PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+  }
+
   test("SPARK_CONF_DIR overrides spark-defaults.conf") {
     forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
       val unusedJar = TestUtils.createJarWithClasses(Seq.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1c11233..8ffe3e2 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite {
 
   test("resolveURI") {
     def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
-      assume(before.split(",").length == 1)
-      assert(Utils.resolveURI(before, testWindows) === new URI(after))
-      assert(Utils.resolveURI(after, testWindows) === new URI(after))
+      // This should test only single paths
+      assume(before.split(",").length === 1)
+      // Repeated invocations of resolveURI should yield the same result
+      def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
+      assert(resolve(after) === after)
+      assert(resolve(resolve(after)) === after)
+      assert(resolve(resolve(resolve(after))) === after)
+      // Also test resolveURIs with single paths
       assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
       assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
     }
@@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite {
     assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
     intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
     intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
+  }
 
-    // Test resolving comma-delimited paths
-    assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
-    assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
-    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
-      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
-    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
+  test("resolveURIs with multiple paths") {
+    def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
+      assume(before.split(",").length > 1)
+      assert(Utils.resolveURIs(before, testWindows) === after)
+      assert(Utils.resolveURIs(after, testWindows) === after)
+      // Repeated invocations of resolveURIs should yield the same result
+      def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
+      assert(resolve(after) === after)
+      assert(resolve(resolve(after)) === after)
+      assert(resolve(resolve(resolve(after))) === after)
+    }
+    val cwd = System.getProperty("user.dir")
+    assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
+    assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
+    assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
+    assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
       s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
-    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
-      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
+    assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
   }
 
   test("nonLocalPaths") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org