You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/07/09 20:55:52 UTC
svn commit: r1501462 - in /hadoop/common/branches/branch-1-win:
CHANGES.branch-1-win.txt src/mapred/mapred-default.xml
src/mapred/org/apache/hadoop/mapred/JobClient.java
src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
Author: cnauroth
Date: Tue Jul 9 18:55:51 2013
New Revision: 1501462
URL: http://svn.apache.org/r1501462
Log:
MAPREDUCE-5278. Distributed cache is broken when JT staging dir is not on the default FS. Contributed by Xi Fang.
Modified:
hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Tue Jul 9 18:55:51 2013
@@ -298,6 +298,9 @@ Branch-hadoop-1-win (branched from branc
MAPREDUCE-5371. TestProxyUserFromEnv#testProxyUserFromEnvironment failed
caused by domains of windows users. (Xi Fang via cnauroth)
+ MAPREDUCE-5278. Distributed cache is broken when JT staging dir is not on the
+ default FS. (Xi Fang via cnauroth)
+
Merged from branch-1
HDFS-385. Backport: Add support for an experimental API that allows a
Modified: hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml Tue Jul 9 18:55:51 2013
@@ -415,6 +415,15 @@
</property>
<property>
+ <name>mapreduce.client.accessible.remote.schemes</name>
+ <value></value>
+ <description>The schemes of the file systems that are accessible from
+ all the nodes in the cluster. Used by the job client to avoid copying
+ distributed cache entries to the job staging dir if path is accessible.
+ </description>
+</property>
+
+<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>2</value>
<description>The maximum number of reduce tasks that will be run
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Jul 9 18:55:51 2013
@@ -47,6 +47,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
@@ -450,6 +451,9 @@ public class JobClient extends Configure
private static final String TASKLOG_PULL_TIMEOUT_KEY =
"mapreduce.client.tasklog.timeout";
private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
+ @Private
+ public static final String CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY =
+ "mapreduce.client.accessible.remote.schemes";
static int tasklogtimeout;
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
@@ -688,18 +692,33 @@ public class JobClient extends Configure
private Path copyRemoteFiles(FileSystem jtFs, Path parentDir,
final Path originalPath, final JobConf job, short replication)
throws IOException, InterruptedException {
- //check if we do not need to copy the files
- // is jt using the same file system.
- // just checking for uri strings... doing no dns lookups
- // to see if the filesystems are the same. This is not optimal.
- // but avoids name resolution.
FileSystem remoteFs = null;
remoteFs = originalPath.getFileSystem(job);
+
+ // Check if we do not need to copy the files
+ // Check whether the given path is accessible from all the
+ // nodes in the cluster in which case we skip the copy operation
+ // altogether and pass the original path.
+ String remoteFsScheme = remoteFs.getUri().getScheme();
+ String [] accessibleSchemes = job.getStrings(
+ CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY, null);
+ if (accessibleSchemes != null) {
+ for (String s : accessibleSchemes) {
+ if (remoteFsScheme.equalsIgnoreCase(s)) {
+ return originalPath;
+ }
+ }
+ }
+ // Check whether jt is using the same file system.
+ // just checking for uri strings... doing no dns lookups
+ // to see if the filesystems are the same. This is not optimal.
+ // but avoids name resolution.
if (compareFs(remoteFs, jtFs)) {
return originalPath;
}
+
// this might have name collisions. copy will throw an exception
//parse the original path to create new path
Path newPath = new Path(parentDir, originalPath.getName());
@@ -808,7 +827,7 @@ public class JobClient extends Configure
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
DistributedCache.addArchiveToClassPath
- (new Path(newPath.toUri().getPath()), job, fs);
+ (new Path(newPath.toUri().getPath()), job, newPath.getFileSystem(job));
}
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java Tue Jul 9 18:55:51 2013
@@ -36,10 +36,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@@ -179,6 +181,96 @@ public class TestMRWithDistributedCache
}
+ /**
+ * Tests setting job tracker's staging dir to a nondefault filesystem.
+ * It validates that distributed cache entries are not copied to the staging
+ * dir for schemes defined in "mapreduce.client.accessible.remote.schemes"
+ * (denoted by JobClient.CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY).
+ */
+ public void testJTStagingOnNondefaultFS() throws Exception {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ try {
+ dfs = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fileSys = dfs.getFileSystem();
+ mr = new MiniMRCluster(1, fileSys.getUri().toString(), 1);
+ runWithConfJTStagingOnNondefaultFS(mr.createJobConf());
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
+ private void runWithConfJTStagingOnNondefaultFS(JobConf conf)
+ throws IOException, InterruptedException,
+ ClassNotFoundException, URISyntaxException {
+ // Create a temporary file of length 1.
+ Path first = createTempFile("distributed.first", "x")
+ .makeQualified(localFs);
+ // Create two jars with a single file inside them.
+ Path second =
+ makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2)
+ .makeQualified(localFs);
+ Path third =
+ makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3)
+ .makeQualified(localFs);
+ // Set configuration properties for this job
+ conf.set("fs.default.name", "file:///");
+ conf.set("tmpfiles", first.toString());
+ conf.set("tmpjars", second.toString());
+ conf.set("tmparchives", third.toString());
+ conf.set(JobClient.CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY,
+ localFs.getUri().getScheme());
+ conf.setMaxMapAttempts(1); // speed up failures
+ // Submit job
+ Job job = new Job(conf);
+ job.setMapperClass(DistributedCacheCheckerJTStagingOnNondefaultFS.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ FileInputFormat.setInputPaths(job, first);
+ job.submit();
+ // Check if the job is successful
+ assertTrue(job.waitForCompletion(false));
+ }
+
+ public static class DistributedCacheCheckerJTStagingOnNondefaultFS extends
+ Mapper<LongWritable, Text, NullWritable, NullWritable> {
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ Path[] files = DistributedCache.getLocalCacheFiles(conf);
+ Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+ // Check that 1 file and 2 archives are present
+ assertEquals(1, files.length);
+ assertEquals(2, archives.length);
+ // Check lengths of the file
+ assertEquals(1, localFs.getFileStatus(files[0]).getLen());
+ // Check the existence of the archives
+ assertTrue(localFs
+ .exists(new Path(archives[0], "distributed.jar.inside2")));
+ assertTrue(localFs
+ .exists(new Path(archives[1], "distributed.jar.inside3")));
+ // Check the schemes of the files/archives specified in
+ // "mapred.cache.archives" and "mapred.cache.files". The schemes
+ // must be "file" if we do not copy the files/archives to jt's staging
+ // dir
+ String[] arxSources = conf.getStrings("mapred.cache.archives");
+ String[] fileSources = conf.getStrings("mapred.cache.files");
+ assertEquals("file", (new Path(fileSources[0])).toUri().getScheme());
+ assertEquals("file", (new Path(arxSources[0])).toUri().getScheme());
+ assertEquals("file", (new Path(arxSources[1])).toUri().getScheme());
+ // Check the class loaders
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ // The archives added by "tmpjars" should be reachable via
+ // the class loader.
+ TestCase.assertNotNull(cl.getResource("distributed.jar.inside2"));
+ TestCase.assertNull(cl.getResource("distributed.jar.inside3"));
+ }
+ }
+
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(TEST_ROOT_DIR, filename);