You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/25 16:57:59 UTC
spark git commit: [SPARK-21714][CORE][YARN] Avoiding re-uploading
remote resources in yarn client mode
Repository: spark
Updated Branches:
refs/heads/master 1f24ceee6 -> 1813c4a8d
[SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode
## What changes were proposed in this pull request?
With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like `args.jars`, `args.pyFiles`) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it.
## How was this patch tested?
This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode.
Author: jerryshao <ss...@hortonworks.com>
Closes #18962 from jerryshao/SPARK-21714.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1813c4a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1813c4a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1813c4a8
Branch: refs/heads/master
Commit: 1813c4a8dd4388fe76a4ec772c9be151be0f60a1
Parents: 1f24cee
Author: jerryshao <ss...@hortonworks.com>
Authored: Fri Aug 25 09:57:53 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Aug 25 09:57:53 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/deploy/SparkSubmit.scala | 64 +++++++++++-------
.../apache/spark/internal/config/package.scala | 2 +-
.../scala/org/apache/spark/util/Utils.scala | 25 ++++---
.../apache/spark/deploy/SparkSubmitSuite.scala | 70 ++++++++++++++++----
.../main/scala/org/apache/spark/repl/Main.scala | 2 +-
5 files changed, 114 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e569251..548149a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -212,14 +212,20 @@ object SparkSubmit extends CommandLineUtils {
/**
* Prepare the environment for submitting an application.
- * This returns a 4-tuple:
- * (1) the arguments for the child process,
- * (2) a list of classpath entries for the child,
- * (3) a map of system properties, and
- * (4) the main class for the child
+ *
+ * @param args the parsed SparkSubmitArguments used for environment preparation.
+ * @param conf the Hadoop Configuration, this argument will only be set in unit test.
+ * @return a 4-tuple:
+ * (1) the arguments for the child process,
+ * (2) a list of classpath entries for the child,
+ * (3) a map of system properties, and
+ * (4) the main class for the child
+ *
* Exposed for testing.
*/
- private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
+ private[deploy] def prepareSubmitEnvironment(
+ args: SparkSubmitArguments,
+ conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
@@ -322,7 +328,7 @@ object SparkSubmit extends CommandLineUtils {
}
}
- val hadoopConf = new HadoopConfiguration()
+ val hadoopConf = conf.getOrElse(new HadoopConfiguration())
val targetDir = DependencyUtils.createTempDir()
// Resolve glob path for different resources.
@@ -332,19 +338,21 @@ object SparkSubmit extends CommandLineUtils {
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
// In client mode, download remote files.
+ var localPrimaryResource: String = null
+ var localJars: String = null
+ var localPyFiles: String = null
if (deployMode == CLIENT) {
- args.primaryResource = Option(args.primaryResource).map {
+ localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, args.sparkProperties, hadoopConf)
}.orNull
- args.jars = Option(args.jars).map {
+ localJars = Option(args.jars).map {
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
}.orNull
- args.pyFiles = Option(args.pyFiles).map {
+ localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
}.orNull
}
-
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
@@ -353,7 +361,7 @@ object SparkSubmit extends CommandLineUtils {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
- args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
+ args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
@@ -363,8 +371,8 @@ object SparkSubmit extends CommandLineUtils {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
- if (args.pyFiles != null) {
- sysProps("spark.submit.pyFiles") = args.pyFiles
+ if (localPyFiles != null) {
+ sysProps("spark.submit.pyFiles") = localPyFiles
}
}
@@ -418,7 +426,7 @@ object SparkSubmit extends CommandLineUtils {
// If an R file is provided, add it to the child arguments and list of files to deploy.
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
- args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
+ args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
@@ -463,6 +471,7 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
+ OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
@@ -486,15 +495,28 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
- OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
+ OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
+
+ // An internal option used only for spark-shell to add user jars to repl's classloader,
+ // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
+ // remote jars, so adding a new option to only specify local jars for spark-shell internally.
+ OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
)
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
- // Also add the main application jar and any added jars to classpath in case YARN client
- // requires these jars.
- if (deployMode == CLIENT || isYarnCluster) {
+ if (deployMode == CLIENT) {
childMainClass = args.mainClass
+ if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
+ childClasspath += localPrimaryResource
+ }
+ if (localJars != null) { childClasspath ++= localJars.split(",") }
+ }
+ // Add the main application jar and any added jars to classpath in case YARN client
+ // requires these jars.
+ // This assumes both primaryResource and user jars are local jars, otherwise it will not be
+ // added to the classpath of YARN client.
+ if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
@@ -551,10 +573,6 @@ object SparkSubmit extends CommandLineUtils {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}
-
- if (args.pyFiles != null) {
- sysProps("spark.submit.pyFiles") = args.pyFiles
- }
}
// assure a keytab is available from any place in a JVM
http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0457a66..0d3769a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -87,7 +87,7 @@ package object config {
.intConf
.createOptional
- private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
+ private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
.internal()
.stringConf
.toSequence
http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 900a619..3dce76c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2601,18 +2601,23 @@ private[spark] object Utils extends Logging {
}
/**
- * In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
- * "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
- * only the "spark.jars" property.
+ * Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
+ * these jars through file server. In the YARN mode, it will return an empty list, since YARN
+ * has its own mechanism to distribute jars.
*/
- def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
+ def getUserJars(conf: SparkConf): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
- if (conf.get("spark.master") == "yarn" && isShell) {
- val yarnJars = conf.getOption("spark.yarn.dist.jars")
- unionFileLists(sparkJars, yarnJars).toSeq
- } else {
- sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
- }
+ sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+ }
+
+ /**
+ * Return the local jar files which will be added to REPL's classpath. These jar files are
+ * specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
+ * SparkSubmit at first.
+ */
+ def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
+ val localJars = conf.getOption("spark.repl.local.jars")
+ localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 08ba41f..95137c8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -29,7 +29,7 @@ import scala.io.Source
import com.google.common.io.ByteStreams
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
@@ -762,7 +762,7 @@ class SparkSubmitSuite
(Set(jar1.toURI.toString, jar2.toURI.toString))
sysProps("spark.yarn.dist.files").split(",").toSet should be
(Set(file1.toURI.toString, file2.toURI.toString))
- sysProps("spark.submit.pyFiles").split(",").toSet should be
+ sysProps("spark.yarn.dist.pyFiles").split(",").toSet should be
(Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
sysProps("spark.yarn.dist.archives").split(",").toSet should be
(Set(archive1.toURI.toString, archive2.toURI.toString))
@@ -802,10 +802,7 @@ class SparkSubmitSuite
test("downloadFile - file doesn't exist") {
val hadoopConf = new Configuration()
val tmpDir = Utils.createTempDir()
- // Set s3a implementation to local file system for testing.
- hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
- // Disable file system impl cache to make sure the test file system is picked up.
- hadoopConf.set("fs.s3a.impl.disable.cache", "true")
+ updateConfWithFakeS3Fs(hadoopConf)
intercept[FileNotFoundException] {
SparkSubmit.downloadFile("s3a:/no/such/file", tmpDir, mutable.Map.empty, hadoopConf)
}
@@ -826,10 +823,7 @@ class SparkSubmitSuite
FileUtils.write(jarFile, content)
val hadoopConf = new Configuration()
val tmpDir = Files.createTempDirectory("tmp").toFile
- // Set s3a implementation to local file system for testing.
- hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
- // Disable file system impl cache to make sure the test file system is picked up.
- hadoopConf.set("fs.s3a.impl.disable.cache", "true")
+ updateConfWithFakeS3Fs(hadoopConf)
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
val outputPath =
SparkSubmit.downloadFile(sourcePath, tmpDir, mutable.Map.empty, hadoopConf)
@@ -844,10 +838,7 @@ class SparkSubmitSuite
FileUtils.write(jarFile, content)
val hadoopConf = new Configuration()
val tmpDir = Files.createTempDirectory("tmp").toFile
- // Set s3a implementation to local file system for testing.
- hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
- // Disable file system impl cache to make sure the test file system is picked up.
- hadoopConf.set("fs.s3a.impl.disable.cache", "true")
+ updateConfWithFakeS3Fs(hadoopConf)
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
val outputPaths = SparkSubmit.downloadFileList(
sourcePaths.mkString(","), tmpDir, mutable.Map.empty, hadoopConf).split(",")
@@ -859,6 +850,43 @@ class SparkSubmitSuite
}
}
+ test("Avoid re-upload remote resources in yarn client mode") {
+ val hadoopConf = new Configuration()
+ updateConfWithFakeS3Fs(hadoopConf)
+
+ val tmpDir = Utils.createTempDir()
+ val file = File.createTempFile("tmpFile", "", tmpDir)
+ val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+ val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
+ val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
+ val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
+
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+ "--name", "testApp",
+ "--master", "yarn",
+ "--deploy-mode", "client",
+ "--jars", tmpJarPath,
+ "--files", s"s3a://${file.getAbsolutePath}",
+ "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+ s"s3a://$mainResource"
+ )
+
+ val appArgs = new SparkSubmitArguments(args)
+ val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3
+
+ // All the resources should still be remote paths, so that YARN client will not upload again.
+ sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
+ sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
+ sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")
+
+ // Local repl jars should be a local path.
+ sysProps("spark.repl.local.jars") should (startWith("file:"))
+
+ // local py files should not be a URI format.
+ sysProps("spark.submit.pyFiles") should (startWith("/"))
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -898,6 +926,11 @@ class SparkSubmitSuite
Utils.deleteRecursively(tmpDir)
}
}
+
+ private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
+ conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
+ conf.set("fs.s3a.impl.disable.cache", "true")
+ }
}
object JarCreationTest extends Logging {
@@ -967,4 +1000,13 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
// Ignore the scheme for testing.
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
}
+
+ override def globStatus(pathPattern: Path): Array[FileStatus] = {
+ val newPath = new Path(pathPattern.toUri.getPath)
+ super.globStatus(newPath).map { status =>
+ val path = s"s3a://${status.getPath.toUri.getPath}"
+ status.setPath(new Path(path))
+ status
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 9702a1e..0b16e1b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -57,7 +57,7 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
- val jars = Utils.getUserJars(conf, isShell = true)
+ val jars = Utils.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
.mkString(File.pathSeparator)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org