You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/11/27 11:50:45 UTC
svn commit: r884832 - in /hadoop/mapreduce/trunk: ./
src/docs/src/documentation/content/xdocs/
src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/filecache/
src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/testshell/
Author: sharad
Date: Fri Nov 27 10:50:44 2009
New Revision: 884832
URL: http://svn.apache.org/viewvc?rev=884832&view=rev
Log:
MAPREDUCE-787. Fix JobSubmitter to honor user given symlink path. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java
hadoop/mapreduce/trunk/src/test/mapred/testshell/ExternalMapReduce.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 27 10:50:44 2009
@@ -921,3 +921,6 @@
MAPREDUCE-1239. Fix contrib components build dependencies.
(Giridharan Kesavan and omalley)
+ MAPREDUCE-787. Fix JobSubmitter to honor user given symlink path.
+ (Amareshwari Sriramadasu via sharad)
+
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Fri Nov 27 10:50:44 2009
@@ -607,17 +607,35 @@
would be present in the current working directory of the task
using the option <code>-files</code>. The <code>-libjars</code>
option allows applications to add jars to the classpaths of the maps
- and reduces. The <code>-archives</code> allows them to pass archives
- as arguments that are unzipped/unjarred and a link with name of the
- jar/zip are created in the current working directory of tasks. More
+ and reduces. The option <code>-archives</code> allows them to pass
+ comma separated list of archives as arguments. These archives are
+ unarchived and a link with name of the archive is created in
+ the current working directory of tasks. More
details about the command line options are available at
<a href="commands_manual.html"> Hadoop Commands Guide.</a></p>
<p>Running <code>wordcount</code> example with
- <code>-libjars</code> and <code>-files</code>:<br/>
+ <code>-libjars</code>, <code>-files</code> and <code>-archives</code>:
+ <br/>
<code> hadoop jar hadoop-examples.jar wordcount -files cachefile.txt
- -libjars mylib.jar input output </code>
- </p>
+ -libjars mylib.jar -archives myarchive.zip input output </code>
+ Here, myarchive.zip will be placed and unzipped into a directory
+ by the name "myarchive.zip"
+ </p>
+
+ <p> Users can specify a different symbolic name for
+ files and archives passed through -files and -archives option, using #.
+ </p>
+
+ <p> For example,
+ <code> hadoop jar hadoop-examples.jar wordcount
+ -files dir1/dict.txt#dict1,dir2/dict.txt#dict2
+ -archives mytar.tgz#tgzdir input output </code>
+ Here, the files dir1/dict.txt and dir2/dict.txt can be accessed by
+ tasks using the symbolic names dict1 and dict2 respectively.
+ And the archive mytar.tgz will be placed and unarchived into a
+ directory by the name tgzdir
+ </p>
</section>
<section>
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml Fri Nov 27 10:50:44 2009
@@ -322,6 +322,10 @@
<source>
-files hdfs://host:fs_port/user/testfile.txt
</source>
+<p> User can specify a different symlink name for -files using #. </p>
+<source>
+-files hdfs://host:fs_port/user/testfile.txt#testfile
+</source>
<p>
Multiple entries can be specified like this:
</p>
@@ -342,6 +346,10 @@
<source>
-archives hdfs://host:fs_port/user/testfile.jar
</source>
+<p> User can specify a different symlink name for -archives using #. </p>
+<source>
+-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir
+</source>
<p>
In this example, the input.txt file has two lines specifying the names of the two files: cachedir.jar/cache.txt and cachedir.jar/cache2.txt.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Nov 27 10:50:44 2009
@@ -160,15 +160,20 @@
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
for (String tmpFile: fileArr) {
- Path tmp = new Path(tmpFile);
+ URI tmpURI = null;
+ try {
+ tmpURI = new URI(tmpFile);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
try {
- URI pathURI = new URI(newPath.toUri().toString() + "#"
- + newPath.getName());
+ URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, conf);
} catch(URISyntaxException ue) {
//should not throw a uri exception
- throw new IOException("Failed to create uri for " + tmpFile);
+ throw new IOException("Failed to create uri for " + tmpFile, ue);
}
DistributedCache.createSymlink(conf);
}
@@ -188,16 +193,21 @@
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
String[] archivesArr = archives.split(",");
for (String tmpArchives: archivesArr) {
- Path tmp = new Path(tmpArchives);
+ URI tmpURI;
+ try {
+ tmpURI = new URI(tmpArchives);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(archivesDir, tmp, conf,
replication);
try {
- URI pathURI = new URI(newPath.toUri().toString() + "#"
- + newPath.getName());
+ URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheArchive(pathURI, conf);
} catch(URISyntaxException ue) {
//should not throw an uri excpetion
- throw new IOException("Failed to create uri for " + tmpArchives);
+ throw new IOException("Failed to create uri for " + tmpArchives, ue);
}
DistributedCache.createSymlink(conf);
}
@@ -207,6 +217,19 @@
TrackerDistributedCacheManager.determineTimestamps(conf);
}
+ private URI getPathURI(Path destPath, String fragment)
+ throws URISyntaxException {
+ URI pathURI = destPath.toUri();
+ if (pathURI.getFragment() == null) {
+ if (fragment == null) {
+ pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
+ } else {
+ pathURI = new URI(pathURI.toString() + "#" + fragment);
+ }
+ }
+ return pathURI;
+ }
+
private void copyJar(Path originalJarPath, Path submitJarFile,
short replication) throws IOException {
jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Fri Nov 27 10:50:44 2009
@@ -96,7 +96,7 @@
Map<String, Path> classPaths = new HashMap<String, Path>();
if (paths != null) {
for (Path p : paths) {
- classPaths.put(p.toString(), p);
+ classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java Fri Nov 27 10:50:44 2009
@@ -19,11 +19,13 @@
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -60,17 +62,59 @@
FileOutputStream fstream = new FileOutputStream(f);
fstream.write("somestrings".getBytes());
fstream.close();
- String[] args = new String[8];
+ File f1 = new File(thisbuildDir, "files_tmp1");
+ fstream = new FileOutputStream(f1);
+ fstream.write("somestrings".getBytes());
+ fstream.close();
+
+ // copy files to dfs
+ Path cachePath = new Path("/cacheDir");
+ if (!fs.mkdirs(cachePath)) {
+ throw new IOException(
+ "Mkdirs failed to create " + cachePath.toString());
+ }
+ Path localCachePath = new Path(System.getProperty("test.cache.data"));
+ Path txtPath = new Path(localCachePath, new Path("test.txt"));
+ Path jarPath = new Path(localCachePath, new Path("test.jar"));
+ Path zipPath = new Path(localCachePath, new Path("test.zip"));
+ Path tarPath = new Path(localCachePath, new Path("test.tar"));
+ Path tgzPath = new Path(localCachePath, new Path("test.tgz"));
+ fs.copyFromLocalFile(txtPath, cachePath);
+ fs.copyFromLocalFile(jarPath, cachePath);
+ fs.copyFromLocalFile(zipPath, cachePath);
+
+ // construct options for -files
+ String[] files = new String[3];
+ files[0] = f.toString();
+ files[1] = f1.toString() + "#localfilelink";
+ files[2] =
+ fs.getUri().resolve(cachePath + "/test.txt#dfsfilelink").toString();
+
+ // construct options for -libjars
+ String[] libjars = new String[2];
+ libjars[0] = "build/test/mapred/testjar/testjob.jar";
+ libjars[1] = fs.getUri().resolve(cachePath + "/test.jar").toString();
+
+ // construct options for archives
+ String[] archives = new String[3];
+ archives[0] = tgzPath.toString();
+ archives[1] = tarPath + "#tarlink";
+ archives[2] =
+ fs.getUri().resolve(cachePath + "/test.zip#ziplink").toString();
+
+ String[] args = new String[10];
args[0] = "-files";
- args[1] = f.toString();
+ args[1] = StringUtils.arrayToString(files);
args[2] = "-libjars";
// the testjob.jar as a temporary jar file
// rather than creating its own
- args[3] = "build/test/mapred/testjar/testjob.jar";
- args[4] = "-D";
- args[5] = "mapred.output.committer.class=testjar.CustomOutputCommitter";
- args[6] = input.toString();
- args[7] = output.toString();
+ args[3] = StringUtils.arrayToString(libjars);
+ args[4] = "-archives";
+ args[5] = StringUtils.arrayToString(archives);
+ args[6] = "-D";
+ args[7] = "mapred.output.committer.class=testjar.CustomOutputCommitter";
+ args[8] = input.toString();
+ args[9] = output.toString();
JobConf jobConf = mr.createJobConf();
//before running the job, verify that libjar is not in client classpath
Modified: hadoop/mapreduce/trunk/src/test/mapred/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/testshell/ExternalMapReduce.java?rev=884832&r1=884831&r2=884832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/testshell/ExternalMapReduce.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/testshell/ExternalMapReduce.java Fri Nov 27 10:50:44 2009
@@ -66,12 +66,21 @@
if (classpath.indexOf("testjob.jar") == -1) {
throw new IOException("failed to find in the library " + classpath);
}
+ if (classpath.indexOf("test.jar") == -1) {
+ throw new IOException("failed to find the library test.jar in"
+ + classpath);
+ }
//fork off ls to see if the file exists.
// java file.exists() will not work on
// cygwin since it is a symlink
- String[] argv = new String[2];
+ String[] argv = new String[7];
argv[0] = "ls";
argv[1] = "files_tmp";
+ argv[2] = "localfilelink";
+ argv[3] = "dfsfilelink";
+ argv[4] = "tarlink";
+ argv[5] = "ziplink";
+ argv[6] = "test.tgz";
Process p = Runtime.getRuntime().exec(argv);
int ret = -1;
try {