You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/02/14 19:33:38 UTC
spark git commit: [SPARK-19501][YARN] Reduce the number of HDFS RPCs
during YARN deployment
Repository: spark
Updated Branches:
refs/heads/master f776e3b42 -> ab9872db1
[SPARK-19501][YARN] Reduce the number of HDFS RPCs during YARN deployment
## What changes were proposed in this pull request?
As discussed in [JIRA](https://issues.apache.org/jira/browse/SPARK-19501), this patch addresses the problem where too many HDFS RPCs are made when there are many URIs specified in `spark.yarn.jars`, potentially adding hundreds of RTTs to YARN before the application launches. This becomes significant when submitting the application to a non-local YARN cluster (where the RTT may be in order of 100ms, for example). For each URI specified, the current implementation makes at least two HDFS RPCs, for:
- [Calling `getFileStatus()` before uploading each file to the distributed cache in `ClientDistributedCacheManager.addResource()`](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71).
- [Resolving any symbolic links in each of the file URI](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L377-L379), which repeatedly makes HDFS RPCs until the all symlinks are resolved. (see [`FileContext.resolve(Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java#L2189-L2195), [`FSLinkResolver.resolve(FileContext, Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSLinkResolver.java#L79-L112), and [`AbstractFileSystem.resolvePath()`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java#L464-L468).)
The first `getFileStatus` RPC can be removed, using `statCache` populated with the file statuses retrieved with [the previous `globStatus` call](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L531).
The second one can be largely reduced by caching the symlink resolution results in a mutable.HashMap. This patch adds a local variable in `yarn.Client.prepareLocalResources()` and passes it as an additional parameter to `yarn.Client.copyFileToRemote`. [The symlink resolution code was added in 2013](https://github.com/apache/spark/commit/a35472e1dd2ea1b5a0b1fb6b382f5a98f5aeba5a#diff-b050df3f55b82065803d6e83453b9706R187) and has not changed since. I am assuming that this is still required, but otherwise we can remove using `symlinkCache` and symlink resolution altogether.
## How was this patch tested?
This patch is based off 8e8afb3, currently the latest YARN patch on master. All tests except a few in spark-hive passed with `./dev/run-tests` on my machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment and execution on a YARN cluster without errors.
Author: Jong Wook Kim <jo...@nyu.edu>
Closes #16916 from jongwook/SPARK-19501.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab9872db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab9872db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab9872db
Branch: refs/heads/master
Commit: ab9872db1f9c0f289541ec5756d1a142d85545ce
Parents: f776e3b
Author: Jong Wook Kim <jo...@nyu.edu>
Authored: Tue Feb 14 11:33:31 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Feb 14 11:33:31 2017 -0800
----------------------------------------------------------------------
.../org/apache/spark/deploy/yarn/Client.scala | 19 +++++++++++++------
.../yarn/ClientDistributedCacheManager.scala | 2 +-
.../apache/spark/deploy/yarn/ClientSuite.scala | 10 +++++-----
3 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ab9872db/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 70826ed..f4f4518 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -326,6 +326,7 @@ private[spark] class Client(
destDir: Path,
srcPath: Path,
replication: Short,
+ symlinkCache: Map[URI, Path],
force: Boolean = false,
destName: Option[String] = None): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
@@ -343,8 +344,12 @@ private[spark] class Client(
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
val qualifiedDestPath = destFs.makeQualified(destPath)
- val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
- fc.resolvePath(qualifiedDestPath)
+ val qualifiedDestDir = qualifiedDestPath.getParent
+ val resolvedDestDir = symlinkCache.getOrElseUpdate(qualifiedDestDir.toUri(), {
+ val fc = FileContext.getFileContext(qualifiedDestDir.toUri(), hadoopConf)
+ fc.resolvePath(qualifiedDestDir)
+ })
+ new Path(resolvedDestDir, qualifiedDestPath.getName())
}
/**
@@ -400,6 +405,7 @@ private[spark] class Client(
FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
@@ -445,7 +451,7 @@ private[spark] class Client(
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(destDir, localPath, replication)
+ val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
@@ -497,8 +503,9 @@ private[spark] class Client(
val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
val pathFs = FileSystem.get(path.toUri(), hadoopConf)
pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
- distribute(entry.getPath().toUri().toString(),
- targetDir = Some(LOCALIZED_LIB_DIR))
+ val uri = entry.getPath().toUri()
+ statCache.update(uri, entry)
+ distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR))
}
} else {
localJars += jar
@@ -614,7 +621,7 @@ private[spark] class Client(
sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
val localConfArchive = new Path(createConfArchive().toURI())
- copyFileToRemote(destDir, localConfArchive, replication, force = true,
+ copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true,
destName = Some(LOCALIZED_CONF_ARCHIVE))
// Manually add the config archive to the cache manager so that the AM is launched with
http://git-wip-us.apache.org/repos/asf/spark/blob/ab9872db/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index dcc2288..e6e0ea3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -68,7 +68,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false): Unit = {
- val destStatus = fs.getFileStatus(destPath)
+ val destStatus = statCache.getOrElse(destPath.toUri(), fs.getFileStatus(destPath))
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(resourceType)
val visibility = getVisibility(conf, destPath.toUri(), statCache)
http://git-wip-us.apache.org/repos/asf/spark/blob/ab9872db/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index dd2180a..3a11787 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -134,7 +134,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
.set("spark.yarn.dist.jars", ADDED)
val client = createClient(sparkConf, args = Array("--jar", USER))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort(), anyBoolean(), any())
+ any(classOf[Path]), anyShort(), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
val tempDir = Utils.createTempDir()
try {
@@ -240,11 +240,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(),
- anyBoolean(), any())
+ any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(),
- anyBoolean(), any())
+ any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(),
- anyBoolean(), any())
+ any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -262,7 +262,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(),
- anyBoolean(), any())
+ any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org