You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/06/21 18:26:05 UTC
hadoop git commit: MAPREDUCE-6719. The list of -libjars archives
should be replaced with a wildcard in the distributed cache to reduce the
application footprint in the state store (Daniel Templeton via sjlee)
Repository: hadoop
Updated Branches:
refs/heads/trunk e15cd4336 -> 605b4b613
MAPREDUCE-6719. The list of -libjars archives should be replaced with a wildcard in the distributed cache to reduce the application footprint in the state store (Daniel Templeton via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/605b4b61
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/605b4b61
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/605b4b61
Branch: refs/heads/trunk
Commit: 605b4b61364781fc99ed27035c793153a20d8f71
Parents: e15cd43
Author: Sangjin Lee <sj...@twitter.com>
Authored: Tue Jun 21 11:25:11 2016 -0700
Committer: Sangjin Lee <sj...@twitter.com>
Committed: Tue Jun 21 11:25:11 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapreduce/v2/util/MRApps.java | 70 +++++++--
.../java/org/apache/hadoop/mapreduce/Job.java | 7 +-
.../hadoop/mapreduce/JobResourceUploader.java | 20 ++-
.../hadoop/mapreduce/JobSubmissionFiles.java | 4 +-
.../apache/hadoop/mapreduce/JobSubmitter.java | 6 +-
.../ClientDistributedCacheManager.java | 31 +++-
.../mapreduce/filecache/DistributedCache.java | 76 ++++++++--
.../src/main/resources/mapred-default.xml | 18 +++
.../TestClientDistributedCacheManager.java | 151 ++++++++++++++++---
.../filecache/TestDistributedCache.java | 132 ++++++++++++++++
.../hadoop/mapred/TestLocalJobSubmission.java | 34 +++--
.../apache/hadoop/mapreduce/v2/TestMRJobs.java | 48 ++++--
12 files changed, 510 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index 31e4c0f..b800d31 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -300,12 +300,36 @@ public class MRApps extends Apps {
for (URI u: withLinks) {
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
+ String name = p.getName();
+ String wildcard = null;
+
+ // If the path is wildcarded, resolve its parent directory instead
+ if (name.equals(DistributedCache.WILDCARD)) {
+ wildcard = name;
+ p = p.getParent();
+ }
+
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
- String name = (null == u.getFragment())
- ? p.getName() : u.getFragment();
+
+ if ((wildcard != null) && (u.getFragment() != null)) {
+ throw new IOException("Invalid path URI: " + p + " - cannot "
+ + "contain both a URI fragment and a wildcard");
+ } else if (wildcard != null) {
+ name = p.getName() + Path.SEPARATOR + wildcard;
+ } else if (u.getFragment() != null) {
+ name = u.getFragment();
+ }
+
+ // If it's not a JAR, add it to the link lookup.
if (!StringUtils.toLowerCase(name).endsWith(".jar")) {
- linkLookup.put(p, name);
+ String old = linkLookup.put(p, name);
+
+ if ((old != null) && !name.equals(old)) {
+ LOG.warn("The same path is included more than once "
+ + "with different links or wildcards: " + p + " [" +
+ name + ", " + old + "]");
+ }
}
}
}
@@ -559,16 +583,42 @@ public class MRApps extends Apps {
URI u = uris[i];
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
+ String linkName = null;
+
+ if (p.getName().equals(DistributedCache.WILDCARD)) {
+ p = p.getParent();
+ linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
+ }
+
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
- // Add URI fragment or just the filename
- Path name = new Path((null == u.getFragment())
- ? p.getName()
- : u.getFragment());
- if (name.isAbsolute()) {
- throw new IllegalArgumentException("Resource name must be relative");
+
+ // If there's no wildcard, try using the fragment for the link
+ if (linkName == null) {
+ linkName = u.getFragment();
+
+ // Because we don't know what's in the fragment, we have to handle
+ // it with care.
+ if (linkName != null) {
+ Path linkPath = new Path(linkName);
+
+ if (linkPath.isAbsolute()) {
+ throw new IllegalArgumentException("Resource name must be "
+ + "relative");
+ }
+
+ linkName = linkPath.toUri().getPath();
+ }
+ } else if (u.getFragment() != null) {
+ throw new IllegalArgumentException("Invalid path URI: " + p +
+ " - cannot contain both a URI fragment and a wildcard");
}
- String linkName = name.toUri().getPath();
+
+ // If there's no wildcard or fragment, just link to the file name
+ if (linkName == null) {
+ linkName = p.getName();
+ }
+
LocalResource orig = localResources.get(linkName);
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
throw new InvalidJobConfException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 481107f..33e820b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -95,10 +95,13 @@ public class Job extends JobContextImpl implements JobContext {
static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
public static final String USED_GENERIC_PARSER =
- "mapreduce.client.genericoptionsparser.used";
+ "mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION =
- "mapreduce.client.submit.file.replication";
+ "mapreduce.client.submit.file.replication";
public static final int DEFAULT_SUBMIT_REPLICATION = 10;
+ public static final String USE_WILDCARD_FOR_LIBJARS =
+ "mapreduce.client.libjars.wildcard";
+ public static final boolean DEFAULT_USE_WILDCARD_FOR_LIBJARS = true;
@InterfaceStability.Evolving
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index f3e4d2f..90e1fba 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -19,10 +19,8 @@ package org.apache.hadoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,10 +38,12 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@InterfaceStability.Unstable
class JobResourceUploader {
protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
- private FileSystem jtFs;
+ private final boolean useWildcard;
+ private final FileSystem jtFs;
- JobResourceUploader(FileSystem submitFs) {
+ JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
this.jtFs = submitFs;
+ this.useWildcard = useWildcard;
}
/**
@@ -126,8 +126,18 @@ class JobResourceUploader {
for (String tmpjars : libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
+
+ // Add each file to the classpath
DistributedCache.addFileToClassPath(
- new Path(newPath.toUri().getPath()), conf, jtFs);
+ new Path(newPath.toUri().getPath()), conf, jtFs, !useWildcard);
+ }
+
+ if (useWildcard) {
+ // Add the whole directory to the cache
+ Path libJarsDirWildcard =
+ jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
+
+ DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
index 7125077..c4adadf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
@@ -41,10 +41,10 @@ public class JobSubmissionFiles {
// job submission directory is private!
final public static FsPermission JOB_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx--------
+ FsPermission.createImmutable((short) 0700); // rwx------
//job files are world-wide readable and owner writable
final public static FsPermission JOB_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
public static Path getJobSplitFile(Path jobSubmissionDir) {
return new Path(jobSubmissionDir, "job.split");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 497b0ed..22874e1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -94,7 +94,11 @@ class JobSubmitter {
*/
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
- JobResourceUploader rUploader = new JobResourceUploader(jtFs);
+ Configuration conf = job.getConfiguration();
+ boolean useWildcards = conf.getBoolean(Job.USE_WILDCARD_FOR_LIBJARS,
+ Job.DEFAULT_USE_WILDCARD_FOR_LIBJARS);
+ JobResourceUploader rUploader = new JobResourceUploader(jtFs, useWildcards);
+
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
index c15e647..19470e8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
@@ -227,21 +227,27 @@ public class ClientDistributedCacheManager {
/**
* Returns a boolean to denote whether a cache file is visible to all(public)
* or not
- * @param conf
- * @param uri
+ * @param conf the configuration
+ * @param uri the URI to test
* @return true if the path in the uri is visible to all, false otherwise
- * @throws IOException
+ * @throws IOException thrown if a file system operation fails
*/
static boolean isPublic(Configuration conf, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
+ boolean isPublic = true;
FileSystem fs = FileSystem.get(uri, conf);
Path current = new Path(uri.getPath());
current = fs.makeQualified(current);
- //the leaf level file should be readable by others
- if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
- return false;
+
+ // If we're looking at a wildcarded path, we only need to check that the
+ // ancestors allow execution. Otherwise, look for read permissions in
+ // addition to the ancestors' permissions.
+ if (!current.getName().equals(DistributedCache.WILDCARD)) {
+ isPublic = checkPermissionOfOther(fs, current, FsAction.READ, statCache);
}
- return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
+
+ return isPublic &&
+ ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
}
/**
@@ -284,11 +290,20 @@ public class ClientDistributedCacheManager {
private static FileStatus getFileStatus(FileSystem fs, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
+ Path path = new Path(uri);
+
+ if (path.getName().equals(DistributedCache.WILDCARD)) {
+ path = path.getParent();
+ uri = path.toUri();
+ }
+
FileStatus stat = statCache.get(uri);
+
if (stat == null) {
- stat = fs.getFileStatus(new Path(uri));
+ stat = fs.getFileStatus(path);
statCache.put(uri, stat);
}
+
return stat;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
index 51fe69a..d4d6c6e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
@@ -126,12 +126,14 @@ import java.net.URI;
* as well as methods intended for use by the MapReduce framework
* (e.g., {@link org.apache.hadoop.mapred.JobClient}).
*
+ * @see org.apache.hadoop.mapreduce.Job
* @see org.apache.hadoop.mapred.JobConf
* @see org.apache.hadoop.mapred.JobClient
*/
@Deprecated
@InterfaceAudience.Private
public class DistributedCache {
+ public static final String WILDCARD = "*";
/**
* Set the configuration with the given set of archives. Intended
@@ -139,6 +141,7 @@ public class DistributedCache {
* @param archives The list of archives that need to be localized
* @param conf Configuration which will be changed
* @deprecated Use {@link Job#setCacheArchives(URI[])} instead
+ * @see Job#setCacheArchives(URI[])
*/
@Deprecated
public static void setCacheArchives(URI[] archives, Configuration conf) {
@@ -152,6 +155,7 @@ public class DistributedCache {
* @param files The list of files that need to be localized
* @param conf Configuration which will be changed
* @deprecated Use {@link Job#setCacheFiles(URI[])} instead
+ * @see Job#setCacheFiles(URI[])
*/
@Deprecated
public static void setCacheFiles(URI[] files, Configuration conf) {
@@ -166,6 +170,7 @@ public class DistributedCache {
* @return A URI array of the caches set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheArchives()} instead
+ * @see JobContext#getCacheArchives()
*/
@Deprecated
public static URI[] getCacheArchives(Configuration conf) throws IOException {
@@ -179,6 +184,7 @@ public class DistributedCache {
* @return A URI array of the files set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheFiles()} instead
+ * @see JobContext#getCacheFiles()
*/
@Deprecated
public static URI[] getCacheFiles(Configuration conf) throws IOException {
@@ -192,6 +198,7 @@ public class DistributedCache {
* @return A path array of localized caches
* @throws IOException
* @deprecated Use {@link JobContext#getLocalCacheArchives()} instead
+ * @see JobContext#getLocalCacheArchives()
*/
@Deprecated
public static Path[] getLocalCacheArchives(Configuration conf)
@@ -207,6 +214,7 @@ public class DistributedCache {
* @return A path array of localized files
* @throws IOException
* @deprecated Use {@link JobContext#getLocalCacheFiles()} instead
+ * @see JobContext#getLocalCacheFiles()
*/
@Deprecated
public static Path[] getLocalCacheFiles(Configuration conf)
@@ -236,6 +244,7 @@ public class DistributedCache {
* @param conf The configuration which stored the timestamps
* @return a long array of timestamps
* @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
+ * @see JobContext#getArchiveTimestamps()
*/
@Deprecated
public static long[] getArchiveTimestamps(Configuration conf) {
@@ -250,6 +259,7 @@ public class DistributedCache {
* @param conf The configuration which stored the timestamps
* @return a long array of timestamps
* @deprecated Use {@link JobContext#getFileTimestamps()} instead
+ * @see JobContext#getFileTimestamps()
*/
@Deprecated
public static long[] getFileTimestamps(Configuration conf) {
@@ -263,6 +273,7 @@ public class DistributedCache {
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
* @deprecated Use {@link Job#addCacheArchive(URI)} instead
+ * @see Job#addCacheArchive(URI)
*/
@Deprecated
public static void addCacheArchive(URI uri, Configuration conf) {
@@ -272,11 +283,27 @@ public class DistributedCache {
}
/**
- * Add a file to be localized to the conf. Intended
- * to be used by user code.
+ * Add a file to be localized to the conf. The localized file will be
+ * downloaded to the execution node(s), and a link will created to the
+ * file from the job's working directory. If the last part of URI's path name
+ * is "*", then the entire parent directory will be localized and links
+ * will be created from the job's working directory to each file in the
+ * parent directory.
+ *
+ * The access permissions of the file will determine whether the localized
+ * file will be shared across jobs. If the file is not readable by other or
+ * if any of its parent directories is not executable by other, then the
+ * file will not be shared. In the case of a path that ends in "/*",
+ * sharing of the localized files will be determined solely from the
+ * access permissions of the parent directories. The access permissions of
+ * the individual files will be ignored.
+ *
+ * Intended to be used by user code.
+ *
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
* @deprecated Use {@link Job#addCacheFile(URI)} instead
+ * @see Job#addCacheFile(URI)
*/
@Deprecated
public static void addCacheFile(URI uri, Configuration conf) {
@@ -286,12 +313,14 @@ public class DistributedCache {
}
/**
- * Add an file path to the current set of classpath entries It adds the file
- * to cache as well. Intended to be used by user code.
+ * Add a file path to the current set of classpath entries. The file will
+ * also be added to the cache. Intended to be used by user code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link Job#addFileToClassPath(Path)} instead
+ * @see #addCacheFile(URI, Configuration)
+ * @see Job#addFileToClassPath(Path)
*/
@Deprecated
public static void addFileToClassPath(Path file, Configuration conf)
@@ -300,22 +329,42 @@ public class DistributedCache {
}
/**
- * Add a file path to the current set of classpath entries. It adds the file
- * to cache as well. Intended to be used by user code.
+ * Add a file path to the current set of classpath entries. The file will
+ * also be added to the cache. Intended to be used by user code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @param fs FileSystem with respect to which {@code archivefile} should
* be interpreted.
+ * @see #addCacheFile(URI, Configuration)
*/
- public static void addFileToClassPath
- (Path file, Configuration conf, FileSystem fs)
- throws IOException {
+ public static void addFileToClassPath(Path file, Configuration conf,
+ FileSystem fs) {
+ addFileToClassPath(file, conf, fs, true);
+ }
+
+ /**
+ * Add a file path to the current set of classpath entries. The file will
+ * also be added to the cache if {@code addToCache} is true. Used by
+ * internal DistributedCache code.
+ *
+ * @param file Path of the file to be added
+ * @param conf Configuration that contains the classpath setting
+ * @param fs FileSystem with respect to which {@code archivefile} should
+ * be interpreted.
+ * @param addToCache whether the file should also be added to the cache list
+ * @see #addCacheFile(URI, Configuration)
+ */
+ public static void addFileToClassPath(Path file, Configuration conf,
+ FileSystem fs, boolean addToCache) {
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
: classpath + "," + file.toString());
- URI uri = fs.makeQualified(file).toUri();
- addCacheFile(uri, conf);
+
+ if (addToCache) {
+ URI uri = fs.makeQualified(file).toUri();
+ addCacheFile(uri, conf);
+ }
}
/**
@@ -323,7 +372,8 @@ public class DistributedCache {
* Used by internal DistributedCache code.
*
* @param conf Configuration that contains the classpath setting
- * @deprecated Use {@link JobContext#getFileClassPaths()} instead
+ * @deprecated Use {@link JobContext#getFileClassPaths()} instead
+ * @see JobContext#getFileClassPaths()
*/
@Deprecated
public static Path[] getFileClassPaths(Configuration conf) {
@@ -346,6 +396,7 @@ public class DistributedCache {
* @param archive Path of the archive to be added
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link Job#addArchiveToClassPath(Path)} instead
+ * @see Job#addArchiveToClassPath(Path)
*/
@Deprecated
public static void addArchiveToClassPath(Path archive, Configuration conf)
@@ -378,6 +429,7 @@ public class DistributedCache {
*
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link JobContext#getArchiveClassPaths()} instead
+ * @see JobContext#getArchiveClassPaths()
*/
@Deprecated
public static Path[] getArchiveClassPaths(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 7634331..d973bd4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -842,6 +842,24 @@
</description>
</property>
+ <property>
+ <name>mapreduce.client.libjars.wildcard</name>
+ <value>true</value>
+ <description>
+ Whether the libjars cache files should be localized using
+ a wildcarded directory instead of naming each archive independently.
+ Using wildcards reduces the space needed for storing the job
+ information in the case of a highly available resource manager
+ configuration.
+ This propery should only be set to false for specific
+ jobs which are highly sensitive to the details of the archive
+ localization. Having this property set to true will cause the archives
+ to all be localized to the same local cache location. If false, each
+ archive will be localized to its own local cache location. In both
+ cases a symbolic link will be created to every archive from the job's
+ working directory.
+ </description>
+ </property>
<property>
<name>mapreduce.task.profile</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java
index b5f45e6..5212b9f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.mapreduce.filecache;
-import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.After;
import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
@@ -55,22 +58,22 @@ public class TestClientDistributedCacheManager {
private static final Path TEST_VISIBILITY_CHILD_DIR =
new Path(TEST_VISIBILITY_PARENT_DIR, "TestCacheVisibility_Child");
+ private static final String FIRST_CACHE_FILE = "firstcachefile";
+ private static final String SECOND_CACHE_FILE = "secondcachefile";
+
private FileSystem fs;
private Path firstCacheFile;
private Path secondCacheFile;
- private Path thirdCacheFile;
private Configuration conf;
@Before
public void setup() throws IOException {
conf = new Configuration();
fs = FileSystem.get(conf);
- firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
- secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
- thirdCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR,"thirdCachefile");
+ firstCacheFile = new Path(TEST_VISIBILITY_PARENT_DIR, FIRST_CACHE_FILE);
+ secondCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR, SECOND_CACHE_FILE);
createTempFile(firstCacheFile, conf);
createTempFile(secondCacheFile, conf);
- createTempFile(thirdCacheFile, conf);
}
@After
@@ -88,37 +91,147 @@ public class TestClientDistributedCacheManager {
job.addCacheFile(secondCacheFile.toUri());
Configuration jobConf = job.getConfiguration();
- Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+ Map<URI, FileStatus> statCache = new HashMap<>();
ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
FileStatus firstStatus = statCache.get(firstCacheFile.toUri());
FileStatus secondStatus = statCache.get(secondCacheFile.toUri());
- Assert.assertNotNull(firstStatus);
- Assert.assertNotNull(secondStatus);
- Assert.assertEquals(2, statCache.size());
+ Assert.assertNotNull(firstCacheFile + " was not found in the stats cache",
+ firstStatus);
+ Assert.assertNotNull(secondCacheFile + " was not found in the stats cache",
+ secondStatus);
+ Assert.assertEquals("Missing/extra entries found in the stas cache",
+ 2, statCache.size());
String expected = firstStatus.getModificationTime() + ","
+ secondStatus.getModificationTime();
Assert.assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
+
+ job = Job.getInstance(conf);
+ job.addCacheFile(new Path(TEST_VISIBILITY_CHILD_DIR, "*").toUri());
+ jobConf = job.getConfiguration();
+ statCache.clear();
+ ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
+
+ FileStatus thirdStatus = statCache.get(TEST_VISIBILITY_CHILD_DIR.toUri());
+
+ Assert.assertEquals("Missing/extra entries found in the stas cache",
+ 1, statCache.size());
+ Assert.assertNotNull(TEST_VISIBILITY_CHILD_DIR
+ + " was not found in the stats cache", thirdStatus);
+ expected = Long.toString(thirdStatus.getModificationTime());
+ Assert.assertEquals("Incorrect timestamp for " + TEST_VISIBILITY_CHILD_DIR,
+ expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
}
@Test
public void testDetermineCacheVisibilities() throws IOException {
- fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR);
+ fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
+ new FsPermission((short)00777));
fs.setPermission(TEST_VISIBILITY_CHILD_DIR,
new FsPermission((short)00777));
+ fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR);
+ Job job = Job.getInstance(conf);
+ Path relativePath = new Path(SECOND_CACHE_FILE);
+ Path wildcardPath = new Path("*");
+ Map<URI, FileStatus> statCache = new HashMap<>();
+ Configuration jobConf;
+
+ job.addCacheFile(firstCacheFile.toUri());
+ job.addCacheFile(relativePath.toUri());
+ jobConf = job.getConfiguration();
+
+ ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
+ statCache);
+ // We use get() instead of getBoolean() so we can tell the difference
+ // between wrong and missing
+ assertEquals("The file paths were not found to be publicly visible "
+ + "even though the full path is publicly accessible",
+ "true,true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
+ checkCacheEntries(statCache, null, firstCacheFile, relativePath);
+
+ job = Job.getInstance(conf);
+ job.addCacheFile(wildcardPath.toUri());
+ jobConf = job.getConfiguration();
+ statCache.clear();
+
+ ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
+ statCache);
+ // We use get() instead of getBoolean() so we can tell the difference
+ // between wrong and missing
+ assertEquals("The file path was not found to be publicly visible "
+ + "even though the full path is publicly accessible",
+ "true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
+ checkCacheEntries(statCache, null, wildcardPath.getParent());
+
+ Path qualifiedParent = fs.makeQualified(TEST_VISIBILITY_PARENT_DIR);
fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
new FsPermission((short)00700));
- Job job = Job.getInstance(conf);
- Path relativePath = new Path("thirdCachefile");
+ job = Job.getInstance(conf);
+ job.addCacheFile(firstCacheFile.toUri());
job.addCacheFile(relativePath.toUri());
- Configuration jobConf = job.getConfiguration();
+ jobConf = job.getConfiguration();
+ statCache.clear();
+
+ ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
+ statCache);
+ // We use get() instead of getBoolean() so we can tell the difference
+ // between wrong and missing
+ assertEquals("The file paths were found to be publicly visible "
+ + "even though the parent directory is not publicly accessible",
+ "false,false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
+ checkCacheEntries(statCache, qualifiedParent,
+ firstCacheFile, relativePath);
+
+ job = Job.getInstance(conf);
+ job.addCacheFile(wildcardPath.toUri());
+ jobConf = job.getConfiguration();
+ statCache.clear();
+
+ ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
+ statCache);
+ // We use get() instead of getBoolean() so we can tell the difference
+ // between wrong and missing
+ assertEquals("The file path was found to be publicly visible "
+ + "even though the parent directory is not publicly accessible",
+ "false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
+ checkCacheEntries(statCache, qualifiedParent, wildcardPath.getParent());
+ }
+
+ /**
+ * Validate that the file status cache contains all and only entries for a
+ * given set of paths up to a common parent.
+ *
+ * @param statCache the cache
+ * @param top the common parent at which to stop digging
+ * @param paths the paths to compare against the cache
+ */
+ private void checkCacheEntries(Map<URI, FileStatus> statCache, Path top,
+ Path... paths) {
+ Set<URI> expected = new HashSet<>();
+
+ for (Path path : paths) {
+ Path p = fs.makeQualified(path);
+
+ while (!p.isRoot() && !p.equals(top)) {
+ expected.add(p.toUri());
+ p = p.getParent();
+ }
+
+ expected.add(p.toUri());
+ }
+
+ Set<URI> uris = statCache.keySet();
+ Set<URI> missing = new HashSet<>(uris);
+ Set<URI> extra = new HashSet<>(expected);
+
+ missing.removeAll(expected);
+ extra.removeAll(uris);
- Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
- ClientDistributedCacheManager.
- determineCacheVisibilities(jobConf, statCache);
- Assert.assertFalse(jobConf.
- getBoolean(MRJobConfig.CACHE_FILE_VISIBILITIES,true));
+ assertTrue("File status cache does not contain an entries for " + missing,
+ missing.isEmpty());
+ assertTrue("File status cache contains extra extries: " + extra,
+ extra.isEmpty());
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
new file mode 100644
index 0000000..14f4020
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.filecache;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test the {@link DistributedCache} class.
+ */
+public class TestDistributedCache {
+ /**
+ * Test of addFileOnlyToClassPath method, of class DistributedCache.
+ */
+ @Test
+ public void testAddFileToClassPath() throws Exception {
+ Configuration conf = new Configuration(false);
+
+ // Test first with 2 args
+ try {
+ DistributedCache.addFileToClassPath(null, conf);
+ fail("Accepted null archives argument");
+ } catch (NullPointerException ex) {
+ // Expected
+ }
+
+ DistributedCache.addFileToClassPath(new Path("file:///a"), conf);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES));
+
+ DistributedCache.addFileToClassPath(new Path("file:///b"), conf);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a,file:/b",
+ conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "file:///a,file:///b",
+ conf.get(MRJobConfig.CACHE_FILES));
+
+ // Now test with 3 args
+ FileSystem fs = FileSystem.newInstance(conf);
+ conf.clear();
+
+ try {
+ DistributedCache.addFileToClassPath(null, conf, fs);
+ fail("Accepted null archives argument");
+ } catch (NullPointerException ex) {
+ // Expected
+ }
+
+ DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES));
+
+ DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a,file:/b",
+ conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "file:///a,file:///b",
+ conf.get(MRJobConfig.CACHE_FILES));
+
+ // Now test with 4th arg true
+ conf.clear();
+
+ try {
+ DistributedCache.addFileToClassPath(null, conf, fs, true);
+ fail("Accepted null archives argument");
+ } catch (NullPointerException ex) {
+ // Expected
+ }
+
+ DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs, true);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES));
+
+ DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs, true);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a,file:/b",
+ conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "file:///a,file:///b",
+ conf.get(MRJobConfig.CACHE_FILES));
+
+ // And finally with 4th arg false
+ conf.clear();
+
+ try {
+ DistributedCache.addFileToClassPath(null, conf, fs, false);
+ fail("Accepted null archives argument");
+ } catch (NullPointerException ex) {
+ // Expected
+ }
+
+ DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs, false);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "", conf.get(MRJobConfig.CACHE_FILES, ""));
+
+ DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs, false);
+ assertEquals("The mapreduce.job.classpath.files property was not "
+ + "set correctly", "file:/a,file:/b",
+ conf.get(MRJobConfig.CLASSPATH_FILES));
+ assertEquals("The mapreduce.job.cache.files property was not set "
+ + "correctly", "", conf.get(MRJobConfig.CACHE_FILES, ""));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
index f7352f1..4a2b857 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
@@ -18,23 +18,20 @@
package org.apache.hadoop.mapred;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.net.URL;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -47,24 +44,31 @@ public class TestLocalJobSubmission {
private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"));
- @Before
- public void configure() throws Exception {
- }
+ /**
+ * Test the local job submission options of -jt local -libjars.
+ *
+ * @throws IOException thrown if there's an error creating the JAR file
+ */
+ @Test
+ public void testLocalJobLibjarsOption() throws IOException {
+ Configuration conf = new Configuration();
- @After
- public void cleanup() {
+ testLocalJobLibjarsOption(conf);
+
+ conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
+ testLocalJobLibjarsOption(conf);
}
/**
- * test the local job submission options of
- * -jt local -libjars.
- * @throws IOException
+ * Test the local job submission options of -jt local -libjars.
+ *
+ * @param conf the {@link Configuration} to use
+ * @throws IOException thrown if there's an error creating the JAR file
*/
- @Test
- public void testLocalJobLibjarsOption() throws IOException {
+ private void testLocalJobLibjarsOption(Configuration conf)
+ throws IOException {
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
- Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
conf.set(MRConfig.FRAMEWORK_NAME, "local");
final String[] args = {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/605b4b61/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
index 900bdeb..451ec57 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
@@ -911,7 +911,8 @@ public class TestMRJobs {
}
}
- public void _testDistributedCache(String jobJarPath) throws Exception {
+ private void testDistributedCache(String jobJarPath, boolean withWildcard)
+ throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
@@ -920,7 +921,7 @@ public class TestMRJobs {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
- // Create two jars with a single file inside them.
+ // Create three jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
@@ -929,16 +930,28 @@ public class TestMRJobs {
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
-
+
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
- // Because the job jar is a "dummy" jar, we need to include the jar with
- // DistributedCacheChecker or it won't be able to find it
- Path distributedCacheCheckerJar = new Path(
- JarFinder.getJar(DistributedCacheChecker.class));
- job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
- localFs.getUri(), distributedCacheCheckerJar.getParent()));
+
+ if (withWildcard) {
+ // If testing with wildcards, upload the DistributedCacheChecker into HDFS
+ // and add the directory as a wildcard.
+ Path libs = new Path("testLibs");
+ Path wildcard = remoteFs.makeQualified(new Path(libs, "*"));
+
+ remoteFs.mkdirs(libs);
+ remoteFs.copyFromLocalFile(third, libs);
+ job.addCacheFile(wildcard.toUri());
+ } else {
+ // Otherwise add the DistributedCacheChecker directly to the classpath.
+ // Because the job jar is a "dummy" jar, we need to include the jar with
+ // DistributedCacheChecker or it won't be able to find it
+ Path distributedCacheCheckerJar = new Path(
+ JarFinder.getJar(DistributedCacheChecker.class));
+ job.addFileToClassPath(localFs.makeQualified(distributedCacheCheckerJar));
+ }
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
@@ -964,11 +977,10 @@ public class TestMRJobs {
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
- @Test (timeout = 600000)
- public void testDistributedCache() throws Exception {
+ private void testDistributedCache(boolean withWildcard) throws Exception {
// Test with a local (file:///) Job Jar
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
- _testDistributedCache(localJobJarPath.toUri().toString());
+ testDistributedCache(localJobJarPath.toUri().toString(), withWildcard);
// Test with a remote (hdfs://) Job Jar
Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/",
@@ -978,7 +990,17 @@ public class TestMRJobs {
if (localJobJarFile.exists()) { // just to make sure
localJobJarFile.delete();
}
- _testDistributedCache(remoteJobJarPath.toUri().toString());
+ testDistributedCache(remoteJobJarPath.toUri().toString(), withWildcard);
+ }
+
+ @Test (timeout = 300000)
+ public void testDistributedCache() throws Exception {
+ testDistributedCache(false);
+ }
+
+ @Test (timeout = 300000)
+ public void testDistributedCacheWithWildcards() throws Exception {
+ testDistributedCache(true);
}
@Test(timeout = 120000)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org