You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/07/08 20:26:19 UTC
spark git commit: [SPARK-8657] [YARN] [HOTFIX] Fail to upload
resource to viewfs
Repository: spark
Updated Branches:
refs/heads/branch-1.4 e91d87e66 -> e4313db38
[SPARK-8657] [YARN] [HOTFIX] Fail to upload resource to viewfs
Fail to upload resource to viewfs in spark-1.4
JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657
Author: Tao Li <li...@sogou-inc.com>
Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits:
65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs
(cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c)
Signed-off-by: Sean Owen <so...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4313db3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4313db3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4313db3
Branch: refs/heads/branch-1.4
Commit: e4313db38e81f6288f1704c22e17d0c6e81b4d75
Parents: e91d87e
Author: Tao Li <li...@sogou-inc.com>
Authored: Wed Jul 8 19:02:24 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Jul 8 19:23:29 2015 +0100
----------------------------------------------------------------------
.../org/apache/spark/deploy/yarn/Client.scala | 57 ++------------------
1 file changed, 4 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e4313db3/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 38e5926..cc0aa45 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -304,57 +304,6 @@ private[spark] class Client(
}
/**
- * Distribute a file to the cluster.
- *
- * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
- * to HDFS (if not already there) and added to the application's distributed cache.
- *
- * @param path URI of the file to distribute.
- * @param resType Type of resource being distributed.
- * @param destName Name of the file in the distributed cache.
- * @param targetDir Subdirectory where to place the file.
- * @param appMasterOnly Whether to distribute only to the AM.
- * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
- * localized path for non-local paths, or the input `path` for local paths.
- * The localized path will be null if the URI has already been added to the cache.
- */
- def distribute(
- path: String,
- resType: LocalResourceType = LocalResourceType.FILE,
- destName: Option[String] = None,
- targetDir: Option[String] = None,
- appMasterOnly: Boolean = false): (Boolean, String) = {
- val localURI = new URI(path.trim())
- if (localURI.getScheme != LOCAL_SCHEME) {
- if (addDistributedUri(localURI)) {
- val localPath = getQualifiedLocalPath(localURI, hadoopConf)
- val linkname = targetDir.map(_ + "/").getOrElse("") +
- destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
- distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
- appMasterOnly = appMasterOnly)
- (false, linkname)
- } else {
- (false, null)
- }
- } else {
- (true, path.trim())
- }
- }
-
- // If we passed in a keytab, make sure we copy the keytab to the staging directory on
- // HDFS, and setup the relevant environment vars, so the AM can login again.
- if (loginFromKeytab) {
- logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
- " via the YARN Secure Distributed Cache.")
- val (_, localizedPath) = distribute(args.keytab,
- destName = Some(sparkConf.get("spark.yarn.keytab")),
- appMasterOnly = true)
- require(localizedPath != null, "Keytab file already distributed.")
- }
-
- /**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
@@ -389,7 +338,8 @@ private[spark] class Client(
createConfArchive().foreach { file =>
require(addDistributedUri(file.toURI()))
val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
- distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(destFs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
}
@@ -414,8 +364,9 @@ private[spark] class Client(
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
+ destFs, hadoopConf, destPath, localResources, resType, linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org