You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/11/03 21:10:58 UTC
spark git commit: [SPARK-18099][YARN] Fail if same files added to
distributed cache for --files and --archives
Repository: spark
Updated Branches:
refs/heads/master 16293311c -> 098e4ca9c
[SPARK-18099][YARN] Fail if same files added to distributed cache for --files and --archives
## What changes were proposed in this pull request?
During spark-submit, if yarn dist cache is instructed to add same file under --files and --archives, This code change ensures the spark yarn distributed cache behaviour is retained i.e. to warn and fail if same files is mentioned in both --files and --archives.
## How was this patch tested?
Manually tested:
1. if same jar is mentioned in --jars and --files it will continue to submit the job.
- basically functionality [SPARK-14423] #12203 is unchanged
1. if same file is mentioned in --files and --archives it will fail to submit the job.
Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
\u2026 under archives and files
Author: Kishor Patil <kp...@yahoo-inc.com>
Closes #15627 from kishorvpatil/spark18099.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/098e4ca9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/098e4ca9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/098e4ca9
Branch: refs/heads/master
Commit: 098e4ca9c7af61e64839a50c65be449749af6482
Parents: 1629331
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Nov 3 16:10:26 2016 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Thu Nov 3 16:10:26 2016 -0500
----------------------------------------------------------------------
.../org/apache/spark/deploy/yarn/Client.scala | 12 +++++-
.../apache/spark/deploy/yarn/ClientSuite.scala | 42 ++++++++++++++++++++
2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/098e4ca9/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 053a786..172fb46 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -598,8 +598,16 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
- if (addToClasspath && localizedPath != null) {
- cachedSecondaryJarLinks += localizedPath
+ // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache.
+ if (addToClasspath) {
+ if (localizedPath != null) {
+ cachedSecondaryJarLinks += localizedPath
+ }
+ } else {
+ if (localizedPath != null) {
+ throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
+ " to the distributed cache.")
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/098e4ca9/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 0a4f291..06516c1 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
}
+ test("distribute archive multiple times") {
+ val libs = Utils.createTempDir()
+ // Create jars dir and RELEASE file to avoid IllegalStateException.
+ val jarsDir = new File(libs, "jars")
+ assert(jarsDir.mkdir())
+ new FileOutputStream(new File(libs, "RELEASE")).close()
+
+ val userLib1 = Utils.createTempDir()
+ val testJar = TestUtils.createJarWithFiles(Map(), userLib1)
+
+ // Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files
+ val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
+ .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
+ .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))
+
+ val client = createClient(sparkConf)
+ val tempDir = Utils.createTempDir()
+ intercept[IllegalArgumentException] {
+ client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
+ }
+
+ // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files.
+ val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
+ .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
+
+ val clientFiles = createClient(sparkConfFiles)
+ val tempDirForFiles = Utils.createTempDir()
+ intercept[IllegalArgumentException] {
+ clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil)
+ }
+
+ // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files.
+ val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
+ .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
+
+ val clientArchives = createClient(sparkConfArchives)
+ val tempDirForArchives = Utils.createTempDir()
+ intercept[IllegalArgumentException] {
+ clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil)
+ }
+ }
+
test("distribute local spark jars") {
val temp = Utils.createTempDir()
val jarsDir = new File(temp, "jars")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org