You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/30 23:29:11 UTC
git commit: [SPARK-3319] [SPARK-3338] Resolve Spark submit config
paths
Repository: spark
Updated Branches:
refs/heads/master 9142c9b80 -> 24c512925
[SPARK-3319] [SPARK-3338] Resolve Spark submit config paths
The bulk of this PR is comprised of tests. All changes in functionality are made in `SparkSubmit.scala` (~20 lines).
**SPARK-3319.** There is currently a divergence in behavior when the user passes in additional jars through `--jars` and through setting `spark.jars` in the default properties file. The former will happily resolve the paths (e.g. convert `my.jar` to `file:/absolute/path/to/my.jar`), while the latter does not. We should resolve paths consistently in both cases. This also applies to the following pairs of command line arguments and Spark configs:
- `--jars` ~ `spark.jars`
- `--files` ~ `spark.files` / `spark.yarn.dist.files`
- `--archives` ~ `spark.yarn.dist.archives`
- `--py-files` ~ `spark.submit.pyFiles`
**SPARK-3338.** This PR also fixes the following bug: if the user sets `spark.submit.pyFiles` in his/her properties file, it does not actually get picked up even if `--py-files` is not set. This is simply because the config is overridden by an empty string.
Author: Andrew Or <an...@gmail.com>
Author: Andrew Or <an...@databricks.com>
Closes #2232 from andrewor14/resolve-config-paths and squashes the following commits:
fff2869 [Andrew Or] Add spark.yarn.jar
da3a1c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths
f0fae64 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths
05e03d6 [Andrew Or] Add tests for resolving both command line and config paths
460117e [Andrew Or] Resolve config paths properly
fe039d3 [Andrew Or] Beef up tests to test fixed-pointed-ness of Utils.resolveURI(s)
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24c51292
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24c51292
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24c51292
Branch: refs/heads/master
Commit: 24c5129257ce6e3b734f168e860b714c2730b55f
Parents: 9142c9b
Author: Andrew Or <an...@gmail.com>
Authored: Thu Oct 30 15:29:07 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Oct 30 15:29:07 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/deploy/SparkSubmit.scala | 28 ++++-
.../apache/spark/deploy/SparkSubmitSuite.scala | 106 ++++++++++++++++++-
.../org/apache/spark/util/UtilsSuite.scala | 38 +++++--
3 files changed, 158 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index f97bf67..0379ade 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,8 +158,9 @@ object SparkSubmit {
args.files = mergeFileLists(args.files, args.primaryResource)
}
args.files = mergeFileLists(args.files, args.pyFiles)
- // Format python file paths properly before adding them to the PYTHONPATH
- sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
+ if (args.pyFiles != null) {
+ sysProps("spark.submit.pyFiles") = args.pyFiles
+ }
}
// Special flag to avoid deprecation warnings at the client
@@ -284,6 +285,29 @@ object SparkSubmit {
sysProps.getOrElseUpdate(k, v)
}
+ // Resolve paths in certain spark properties
+ val pathConfigs = Seq(
+ "spark.jars",
+ "spark.files",
+ "spark.yarn.jar",
+ "spark.yarn.dist.files",
+ "spark.yarn.dist.archives")
+ pathConfigs.foreach { config =>
+ // Replace old URIs with resolved URIs, if they exist
+ sysProps.get(config).foreach { oldValue =>
+ sysProps(config) = Utils.resolveURIs(oldValue)
+ }
+ }
+
+ // Resolve and format python file paths properly before adding them to the PYTHONPATH.
+ // The resolving part is redundant in the case of --py-files, but necessary if the user
+ // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
+ sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
+ val resolvedPyFiles = Utils.resolveURIs(pyFiles)
+ val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+ sysProps("spark.submit.pyFiles") = formattedPyFiles
+ }
+
(childArgs, childClasspath, sysProps, childMainClass)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1cdf50d..d8cd0ff 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}
- test("spark submit includes jars passed in through --jar") {
+ test("includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
@@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}
+ test("resolves command line argument paths correctly") {
+ val jars = "/jar1,/jar2" // --jars
+ val files = "hdfs:/file1,file2" // --files
+ val archives = "file:/archive1,archive2" // --archives
+ val pyFiles = "py-file1,py-file2" // --py-files
+
+ // Test jars and files
+ val clArgs = Seq(
+ "--master", "local",
+ "--class", "org.SomeClass",
+ "--jars", jars,
+ "--files", files,
+ "thejar.jar")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+ appArgs.jars should be (Utils.resolveURIs(jars))
+ appArgs.files should be (Utils.resolveURIs(files))
+ sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
+ sysProps("spark.files") should be (Utils.resolveURIs(files))
+
+ // Test files and archives (Yarn)
+ val clArgs2 = Seq(
+ "--master", "yarn-client",
+ "--class", "org.SomeClass",
+ "--files", files,
+ "--archives", archives,
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+ appArgs2.files should be (Utils.resolveURIs(files))
+ appArgs2.archives should be (Utils.resolveURIs(archives))
+ sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
+ sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
+
+ // Test python files
+ val clArgs3 = Seq(
+ "--master", "local",
+ "--py-files", pyFiles,
+ "mister.py"
+ )
+ val appArgs3 = new SparkSubmitArguments(clArgs3)
+ val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+ appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
+ sysProps3("spark.submit.pyFiles") should be (
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ }
+
+ test("resolves config paths correctly") {
+ val jars = "/jar1,/jar2" // spark.jars
+ val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
+ val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
+ val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
+
+ // Test jars and files
+ val f1 = File.createTempFile("test-submit-jars-files", "")
+ val writer1 = new PrintWriter(f1)
+ writer1.println("spark.jars " + jars)
+ writer1.println("spark.files " + files)
+ writer1.close()
+ val clArgs = Seq(
+ "--master", "local",
+ "--class", "org.SomeClass",
+ "--properties-file", f1.getPath,
+ "thejar.jar"
+ )
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+ sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+ sysProps("spark.files") should be(Utils.resolveURIs(files))
+
+ // Test files and archives (Yarn)
+ val f2 = File.createTempFile("test-submit-files-archives", "")
+ val writer2 = new PrintWriter(f2)
+ writer2.println("spark.yarn.dist.files " + files)
+ writer2.println("spark.yarn.dist.archives " + archives)
+ writer2.close()
+ val clArgs2 = Seq(
+ "--master", "yarn-client",
+ "--class", "org.SomeClass",
+ "--properties-file", f2.getPath,
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+ sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+ sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
+
+ // Test python files
+ val f3 = File.createTempFile("test-submit-python-files", "")
+ val writer3 = new PrintWriter(f3)
+ writer3.println("spark.submit.pyFiles " + pyFiles)
+ writer3.close()
+ val clArgs3 = Seq(
+ "--master", "local",
+ "--properties-file", f3.getPath,
+ "mister.py"
+ )
+ val appArgs3 = new SparkSubmitArguments(clArgs3)
+ val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+ sysProps3("spark.submit.pyFiles") should be(
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ }
+
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1c11233..8ffe3e2 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite {
test("resolveURI") {
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
- assume(before.split(",").length == 1)
- assert(Utils.resolveURI(before, testWindows) === new URI(after))
- assert(Utils.resolveURI(after, testWindows) === new URI(after))
+ // This should test only single paths
+ assume(before.split(",").length === 1)
+ // Repeated invocations of resolveURI should yield the same result
+ def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
+ assert(resolve(after) === after)
+ assert(resolve(resolve(after)) === after)
+ assert(resolve(resolve(resolve(after))) === after)
+ // Also test resolveURIs with single paths
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
}
@@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite {
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
+ }
- // Test resolving comma-delimited paths
- assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
- assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
- assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
- assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
+ test("resolveURIs with multiple paths") {
+ def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
+ assume(before.split(",").length > 1)
+ assert(Utils.resolveURIs(before, testWindows) === after)
+ assert(Utils.resolveURIs(after, testWindows) === after)
+ // Repeated invocations of resolveURIs should yield the same result
+ def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
+ assert(resolve(after) === after)
+ assert(resolve(resolve(after)) === after)
+ assert(resolve(resolve(resolve(after))) === after)
+ }
+ val cwd = System.getProperty("user.dir")
+ assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
+ assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
+ assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
+ assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
- assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
+ assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
}
test("nonLocalPaths") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org