You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:31:39 UTC
svn commit: r1077029 - in
/hadoop/common/branches/branch-0.20-security-patches: bin/
src/docs/src/documentation/content/xdocs/ src/test/org/apache/hadoop/fs/
src/tools/org/apache/hadoop/tools/
Author: omalley
Date: Fri Mar 4 03:31:39 2011
New Revision: 1077029
URL: http://svn.apache.org/viewvc?rev=1077029&view=rev
Log:
commit 4100ce7b39e1282691fcaf0a00d5107c2ecb904f
Author: Mahadev Konar <ma...@cdev6022.inktomisearch.com>
Date: Wed Oct 21 01:39:01 2009 +0000
MAPREDUCE-739 from https://issues.apache.org/jira/secure/attachment/12422759/MAPREDUCE-739.yhadoop.patch
+++ b/YAHOO-CHANGES.txt
+
+ MAPREDUCE-739. Allow relative paths to be created inside archives.
+ (mahadev)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/bin/hadoop
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/hadoop_archives.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestHarFileSystem.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/HadoopArchives.java
Modified: hadoop/common/branches/branch-0.20-security-patches/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/bin/hadoop?rev=1077029&r1=1077028&r2=1077029&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/bin/hadoop (original)
+++ hadoop/common/branches/branch-0.20-security-patches/bin/hadoop Fri Mar 4 03:31:39 2011
@@ -72,7 +72,7 @@ if [ $# = 0 ]; then
echo " version print the version"
echo " jar <jar> run a jar file"
echo " distcp <srcurl> <desturl> copy file or directories recursively"
- echo " archive -archiveName NAME <src>* <dest> create a hadoop archive"
+ echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
echo " daemonlog get/set the log level for each daemon"
Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/hadoop_archives.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/hadoop_archives.xml?rev=1077029&r1=1077028&r2=1077029&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/hadoop_archives.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/hadoop_archives.xml Fri Mar 4 03:31:39 2011
@@ -31,26 +31,25 @@
within the part files.
</p>
</section>
+
<section>
<title> How to create an archive? </title>
<p>
- <code>Usage: hadoop archive -archiveName name <src>* <dest></code>
+ <code>Usage: hadoop archive -archiveName name -p <parent> <src>* <dest></code>
</p>
<p>
-archiveName is the name of the archive you would like to create.
An example would be foo.har. The name should have a *.har extension.
- The inputs are file system pathnames which work as usual with regular
- expressions. The destination directory would contain the archive.
+ The parent argument is to specify the relative path to which the files should be
+ archived to. Example would be :
+ </p><p><code> -p /foo/bar a/b/c e/f/g </code></p><p>
+ Here /foo/bar is the parent path and a/b/c, e/f/g are relative paths to parent.
Note that this is a Map/Reduce job that creates the archives. You would
- need a map reduce cluster to run this. The following is an example:</p>
- <p>
- <code>hadoop archive -archiveName foo.har /user/hadoop/dir1 /user/hadoop/dir2 /user/zoo/</code>
- </p><p>
- In the above example /user/hadoop/dir1 and /user/hadoop/dir2 will be
- archived in the following file system directory -- /user/zoo/foo.har.
- The sources are not changed or removed when an archive is created.
- </p>
+ need a map reduce cluster to run this. For a detailed example the later sections. </p>
+ <p> If you just want to archive a single directory /foo/bar then you can just use </p>
+ <p><code> hadoop archive -archiveName zoo.har -p /foo/bar /outputdir </code></p>
</section>
+
<section>
<title> How to look up files in archives? </title>
<p>
@@ -60,20 +59,58 @@
an error. URI for Hadoop Archives is
</p><p><code>har://scheme-hostname:port/archivepath/fileinarchive</code></p><p>
If no scheme is provided it assumes the underlying filesystem.
- In that case the URI would look like
- </p><p><code>
- har:///archivepath/fileinarchive</code></p>
+ In that case the URI would look like </p>
+ <p><code>har:///archivepath/fileinarchive</code></p>
+ </section>
+
+ <section>
+ <title> Example on creating and looking up archives </title>
+ <p><code>hadoop archive -archiveName foo.har -p /user/hadoop dir1 dir2 /user/zoo </code></p>
<p>
- Here is an example of archive. The input to the archives is /dir. The directory dir contains
- files filea, fileb. To archive /dir to /user/hadoop/foo.har, the command is
+ The above example is creating an archive using /user/hadoop as the relative archive directory.
+ The directories /user/hadoop/dir1 and /user/hadoop/dir2 will be
+ archived in the following file system directory -- /user/zoo/foo.har. Archiving does not delete the input
+ files. If you want to delete the input files after creating the archives (to reduce namespace), you
+ will have to do it on your own.
</p>
- <p><code>hadoop archive -archiveName foo.har /dir /user/hadoop</code>
- </p><p>
- To get file listing for files in the created archive
- </p>
- <p><code>hadoop dfs -lsr har:///user/hadoop/foo.har</code></p>
- <p>To cat filea in archive -
- </p><p><code>hadoop dfs -cat har:///user/hadoop/foo.har/dir/filea</code></p>
+
+ <section>
+ <title> Looking up files and understanding the -p option </title>
+ <p> Looking up files in hadoop archives is as easy as doing an ls on the filesystem. After you have
+ archived the directories /user/hadoop/dir1 and /user/hadoop/dir2 as in the exmaple above, to see all
+ the files in the archives you can just run: </p>
+ <p><code>hadoop dfs -lsr har:///user/zoo/foo.har/</code></p>
+ <p> To understand the significance of the -p argument, lets go through the above example again. If you just do
+ an ls (not lsr) on the hadoop archive using </p>
+ <p><code>hadoop dfs -ls har:///user/zoo/foo.har</code></p>
+ <p>The output should be:</p>
+ <source>
+har:///user/zoo/foo.har/dir1
+har:///user/zoo/foo.har/dir2
+ </source>
+ <p> As you can recall the archives were created with the following command </p>
+ <p><code>hadoop archive -archiveName foo.har -p /user/hadoop dir1 dir2 /user/zoo </code></p>
+ <p> If we were to change the command to: </p>
+ <p><code>hadoop archive -archiveName foo.har -p /user/ hadoop/dir1 hadoop/dir2 /user/zoo </code></p>
+ <p> then a ls on the hadoop archive using </p>
+ <p><code>hadoop dfs -ls har:///user/zoo/foo.har</code></p>
+ <p>would give you</p>
+ <source>
+har:///user/zoo/foo.har/hadoop/dir1
+har:///user/zoo/foo.har/hadoop/dir2
+ </source>
+ <p>
+ Notice that the archived files have been archived relative to /user/ rather than /user/hadoop.
+ </p>
+ </section>
+ </section>
+
+ <section>
+ <title> Using Hadoop Archives with Map Reduce </title>
+ <p>Using Hadoop Archives in Map Reduce is as easy as specifying a different input filesystem than the default file system.
+ If you have a hadoop archive stored in HDFS in /user/zoo/foo.har then for using this archive for Map Reduce input, all
+ you need to specify the input directory as har:///user/zoo/foo.har. Since Hadoop Archives is exposed as a file system
+ Map Reduce will be able to use all the logical input files in Hadoop Archives as input.</p>
</section>
- </body>
+ </body>
</document>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestHarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestHarFileSystem.java?rev=1077029&r1=1077028&r2=1077029&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestHarFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestHarFileSystem.java Fri Mar 4 03:31:39 2011
@@ -18,34 +18,26 @@
package org.apache.hadoop.fs;
-
import java.io.IOException;
+import java.net.URI;
import java.util.Iterator;
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.ToolRunner;
-import junit.framework.TestCase;
-
/**
* test the har file system
* create a har filesystem
@@ -53,7 +45,7 @@ import junit.framework.TestCase;
* and then run a map reduce job
*/
public class TestHarFileSystem extends TestCase {
- private Path inputPath;
+ private Path inputPath, inputrelPath;
private MiniDFSCluster dfscluster;
private MiniMRCluster mapred;
private FileSystem fs;
@@ -62,17 +54,32 @@ public class TestHarFileSystem extends T
protected void setUp() throws Exception {
super.setUp();
- dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null);
+ dfscluster = new MiniDFSCluster(new Configuration(), 2, true, null);
fs = dfscluster.getFileSystem();
mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
inputPath = new Path(fs.getHomeDirectory(), "test");
+ inputrelPath = new Path(fs.getHomeDirectory().toUri().
+ getPath().substring(1), "test");
filea = new Path(inputPath,"a");
fileb = new Path(inputPath,"b");
filec = new Path(inputPath,"c");
- // check for har containing escape worthy characters
- // in there name
filed = new Path(inputPath, "d%d");
+ // check for har containing escape worthy
+ // characters in there names
archivePath = new Path(fs.getHomeDirectory(), "tmp");
+ fs.mkdirs(inputPath);
+ FSDataOutputStream out = fs.create(filea);
+ out.write("a".getBytes());
+ out.close();
+ out = fs.create(fileb);
+ out.write("b".getBytes());
+ out.close();
+ out = fs.create(filec);
+ out.write("c".getBytes());
+ out.close();
+ out = fs.create(filed);
+ out.write("d".getBytes());
+ out.close();
}
protected void tearDown() throws Exception {
@@ -112,52 +119,100 @@ public class TestHarFileSystem extends T
}
}
- public void testArchives() throws Exception {
- fs.mkdirs(inputPath);
-
- FSDataOutputStream out = fs.create(filea);
- out.write("a".getBytes());
- out.close();
- out = fs.create(fileb);
- out.write("b".getBytes());
- out.close();
- out = fs.create(filec);
- out.write("c".getBytes());
- out.close();
- out = fs.create(filed);
- out.write("d".getBytes());
- out.close();
+ // test archives with a -p option
+ public void testRelativeArchives() throws Exception {
+ fs.delete(archivePath,true);
+ Configuration conf = mapred.createJobConf();
+ HadoopArchives har = new HadoopArchives(conf);
+ String[] args = new String[6];
+ args[0] = "-archiveName";
+ args[1] = "foo.har";
+ args[2] = "-p";
+ args[3] = fs.getHomeDirectory().toString();
+ args[4] = "test";
+ args[5] = archivePath.toString();
+ int ret = ToolRunner.run(har, args);
+ assertTrue("failed test", ret == 0);
+ Path finalPath = new Path(archivePath, "foo.har");
+ Path fsPath = new Path(inputPath.toUri().getPath());
+ Path filePath = new Path(finalPath, "test");
+ //make it a har path
+ Path harPath = new Path("har://" + filePath.toUri().getPath());
+ assertTrue(fs.exists(new Path(finalPath, "_index")));
+ assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+ assertTrue(!fs.exists(new Path(finalPath, "_logs")));
+ args = new String[2];
+ args[0] = "-ls";
+ args[1] = harPath.toString();
+ FsShell shell = new FsShell(conf);
+ ret = ToolRunner.run(shell, args);
+ // fileb and filec
+ assertTrue(ret == 0);
+ Path harFilea = new Path(harPath, "a");
+ Path harFileb = new Path(harPath, "b");
+ Path harFilec = new Path(harPath, "c");
+ Path harFiled = new Path(harPath, "d%d");
+ FileSystem harFs = harFilea.getFileSystem(conf);
+ FSDataInputStream fin = harFs.open(harFilea);
+ byte[] b = new byte[4];
+ int readBytes = fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "a".getBytes()[0]));
+ fin = harFs.open(harFileb);
+ fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "b".getBytes()[0]));
+ fin = harFs.open(harFilec);
+ fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
+ fin = harFs.open(harFiled);
+ fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "d".getBytes()[0]));
+ }
+
+
+ public void testArchivesWithMapred() throws Exception {
+ //one minor check
+ // check to see if fs.har.impl.disable.cache is set
Configuration conf = mapred.createJobConf();
- // check to see if fs.har.impl.disable.cache is true
boolean archivecaching = conf.getBoolean("fs.har.impl.disable.cache", false);
assertTrue(archivecaching);
+ fs.delete(archivePath, true);
HadoopArchives har = new HadoopArchives(conf);
- String[] args = new String[3];
+ String[] args = new String[4];
+
//check for destination not specfied
args[0] = "-archiveName";
args[1] = "foo.har";
- args[2] = inputPath.toString();
+ args[2] = "-p";
+ args[3] = "/";
int ret = ToolRunner.run(har, args);
assertTrue(ret != 0);
- args = new String[4];
+ args = new String[6];
//check for wrong archiveName
args[0] = "-archiveName";
args[1] = "/d/foo.har";
- args[2] = inputPath.toString();
- args[3] = archivePath.toString();
+ args[2] = "-p";
+ args[3] = "/";
+ args[4] = inputrelPath.toString();
+ args[5] = archivePath.toString();
ret = ToolRunner.run(har, args);
assertTrue(ret != 0);
-// se if dest is a file
+ // se if dest is a file
args[1] = "foo.har";
- args[3] = filec.toString();
+ args[5] = filec.toString();
ret = ToolRunner.run(har, args);
assertTrue(ret != 0);
//this is a valid run
args[0] = "-archiveName";
args[1] = "foo.har";
- args[2] = inputPath.toString();
- args[3] = archivePath.toString();
+ args[2] = "-p";
+ args[3] = "/";
+ args[4] = inputrelPath.toString();
+ args[5] = archivePath.toString();
ret = ToolRunner.run(har, args);
//checl for the existenece of the archive
assertTrue(ret == 0);
@@ -170,13 +225,16 @@ public class TestHarFileSystem extends T
String relative = fsPath.toString().substring(1);
Path filePath = new Path(finalPath, relative);
//make it a har path
- Path harPath = new Path("har://" + filePath.toUri().getPath());
+ URI uri = fs.getUri();
+ Path harPath = new Path("har://" + "hdfs-" + uri.getHost() +":" +
+ uri.getPort() + filePath.toUri().getPath());
assertTrue(fs.exists(new Path(finalPath, "_index")));
assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
assertTrue(!fs.exists(new Path(finalPath, "_logs")));
//creation tested
//check if the archive is same
// do ls and cat on all the files
+
FsShell shell = new FsShell(conf);
args = new String[2];
args[0] = "-ls";
@@ -194,21 +252,19 @@ public class TestHarFileSystem extends T
FSDataInputStream fin = harFs.open(harFilea);
byte[] b = new byte[4];
int readBytes = fin.read(b);
+ assertTrue("Empty read.", readBytes > 0);
fin.close();
assertTrue("strings are equal ", (b[0] == "a".getBytes()[0]));
fin = harFs.open(harFileb);
- fin.read(b);
+ readBytes = fin.read(b);
+ assertTrue("Empty read.", readBytes > 0);
fin.close();
assertTrue("strings are equal ", (b[0] == "b".getBytes()[0]));
fin = harFs.open(harFilec);
- fin.read(b);
+ readBytes = fin.read(b);
+ assertTrue("Empty read.", readBytes > 0);
fin.close();
assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
- fin = harFs.open(harFiled);
- fin.read(b);
- fin.close();
- assertTrue("strings are equal ", (b[0] == "d".getBytes()[0]));
-
// ok all files match
// run a map reduce job
Path outdir = new Path(fs.getHomeDirectory(), "mapout");
@@ -230,7 +286,8 @@ public class TestHarFileSystem extends T
Path reduceFile = status[0].getPath();
FSDataInputStream reduceIn = fs.open(reduceFile);
b = new byte[8];
- reduceIn.read(b);
+ readBytes = reduceIn.read(b);
+ assertTrue("Should read 8 bytes.", readBytes == 8);
//assuming all the 8 bytes were read.
Text readTxt = new Text(b);
assertTrue("a\nb\nc\nd\n".equals(readTxt.toString()));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=1077029&r1=1077028&r2=1077029&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/HadoopArchives.java Fri Mar 4 03:31:39 2011
@@ -60,6 +60,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+
/**
* a archive creation utility.
* This class provides methods that can be used
@@ -77,12 +78,13 @@ public class HadoopArchives implements T
static final String SRC_COUNT_LABEL = NAME + ".src.count";
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
static final String DST_HAR_LABEL = NAME + ".archive.name";
+ static final String SRC_PARENT_LABEL = NAME + ".parent.path";
// size of each part file
// its fixed for now.
static final long partSize = 2 * 1024 * 1024 * 1024l;
private static final String usage = "archive"
- + " -archiveName NAME <src>* <dest>" +
+ + " -archiveName NAME -p <parent path> <src>* <dest>" +
"\n";
@@ -228,24 +230,53 @@ public class HadoopArchives implements T
return deepest;
}
- // this method is tricky. This method writes
- // the top level directories in such a way so that
- // the output only contains valid directoreis in archives.
- // so for an input path specified by the user
- // as /user/hadoop
- // we need to index
- // / as the root
- // /user as a directory
- // /user/hadoop as a directory
- // so for multiple input paths it makes sure that it
- // does the right thing.
- // so if the user specifies the input directories as
- // /user/harry and /user/hadoop
- // we need to write / and user as its child
- // and /user and harry and hadoop as its children
+ /**
+ * truncate the prefix root from the full path
+ * @param fullPath the full path
+ * @param root the prefix root to be truncated
+ * @return the relative path
+ */
+ private Path relPathToRoot(Path fullPath, Path root) {
+ // just take some effort to do it
+ // rather than just using substring
+ // so that we do not break sometime later
+ Path justRoot = new Path(Path.SEPARATOR);
+ if (fullPath.depth() == root.depth()) {
+ return justRoot;
+ }
+ else if (fullPath.depth() > root.depth()) {
+ Path retPath = new Path(fullPath.getName());
+ Path parent = fullPath.getParent();
+ for (int i=0; i < (fullPath.depth() - root.depth() -1); i++) {
+ retPath = new Path(parent.getName(), retPath);
+ parent = parent.getParent();
+ }
+ return new Path(justRoot, retPath);
+ }
+ return null;
+ }
+
+ /**
+ * this method writes all the valid top level directories
+ * into the srcWriter for indexing. This method is a little
+ * tricky. example-
+ * for an input with parent path /home/user/ and sources
+ * as /home/user/source/dir1, /home/user/source/dir2 - this
+ * will output <source, dir, dir1, dir2> (dir means that source is a dir
+ * with dir1 and dir2 as children) and <source/dir1, file, null>
+ * and <source/dir2, file, null>
+ * @param srcWriter the sequence file writer to write the
+ * directories to
+ * @param paths the source paths provided by the user. They
+ * are glob free and have full path (not relative paths)
+ * @param parentPath the parent path that you wnat the archives
+ * to be relative to. example - /home/user/dir1 can be archived with
+ * parent as /home or /home/user.
+ * @throws IOException
+ */
private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
- List<Path> paths) throws IOException {
- //these are qualified paths
+ List<Path> paths, Path parentPath) throws IOException {
+ //add all the directories
List<Path> justDirs = new ArrayList<Path>();
for (Path p: paths) {
if (!p.getFileSystem(getConf()).isFile(p)) {
@@ -255,17 +286,23 @@ public class HadoopArchives implements T
justDirs.add(new Path(p.getParent().toUri().getPath()));
}
}
-
- //get the largest depth path
- // this is tricky
- TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
+ /* find all the common parents of paths that are valid archive
+ * paths. The below is done so that we do not add a common path
+ * twice and also we need to only add valid child of a path that
+ * are specified the user.
+ */
+ TreeMap<String, HashSet<String>> allpaths = new TreeMap<String,
+ HashSet<String>>();
+ /* the largest depth of paths. the max number of times
+ * we need to iterate
+ */
Path deepest = largestDepth(paths);
Path root = new Path(Path.SEPARATOR);
- for (int i = 0; i < deepest.depth(); i++) {
+ for (int i = parentPath.depth(); i < deepest.depth(); i++) {
List<Path> parents = new ArrayList<Path>();
for (Path p: justDirs) {
if (p.compareTo(root) == 0){
- //don nothing
+ //do nothing
}
else {
Path parent = p.getParent();
@@ -285,34 +322,40 @@ public class HadoopArchives implements T
}
Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
for (Map.Entry<String, HashSet<String>> entry : keyVals) {
- HashSet<String> children = entry.getValue();
- String toWrite = entry.getKey() + " dir ";
- StringBuffer sbuff = new StringBuffer();
- sbuff.append(toWrite);
- for (String child: children) {
- sbuff.append(child + " ");
+ Path relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
+ if (relPath != null) {
+ String toWrite = relPath + " dir ";
+ HashSet<String> children = entry.getValue();
+ StringBuffer sbuff = new StringBuffer();
+ sbuff.append(toWrite);
+ for (String child: children) {
+ sbuff.append(child + " ");
+ }
+ toWrite = sbuff.toString();
+ srcWriter.append(new LongWritable(0L), new Text(toWrite));
}
- toWrite = sbuff.toString();
- srcWriter.append(new LongWritable(0L), new Text(toWrite));
}
}
/**archive the given source paths into
* the dest
+ * @param parentPath the parent path of all the source paths
* @param srcPaths the src paths to be archived
* @param dest the dest dir that will contain the archive
*/
- public void archive(List<Path> srcPaths, String archiveName, Path dest)
- throws IOException {
+ void archive(Path parentPath, List<Path> srcPaths,
+ String archiveName, Path dest) throws IOException {
checkPaths(conf, srcPaths);
int numFiles = 0;
long totalSize = 0;
+ FileSystem fs = parentPath.getFileSystem(conf);
conf.set(DST_HAR_LABEL, archiveName);
+ conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
Path outputPath = new Path(dest, archiveName);
FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf);
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
- throw new IOException("Invalid Output.");
+ throw new IOException("Invalid Output: " + outputPath);
}
conf.set(DST_DIR_LABEL, outputPath.toString());
final String randomId = DistCp.getRandomId();
@@ -331,7 +374,7 @@ public class HadoopArchives implements T
// create single list of files and dirs
try {
// write the top level dirs in first
- writeTopLevelDirs(srcWriter, srcPaths);
+ writeTopLevelDirs(srcWriter, srcPaths, parentPath);
srcWriter.sync();
// these are the input paths passed
// from the command line
@@ -339,14 +382,13 @@ public class HadoopArchives implements T
// and then write them to the input file
// one at a time
for (Path src: srcPaths) {
- FileSystem fs = src.getFileSystem(conf);
ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
recursivels(fs, src, allFiles);
for (FileStatus stat: allFiles) {
String toWrite = "";
long len = stat.isDir()? 0:stat.getLen();
if (stat.isDir()) {
- toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
+ toWrite = "" + relPathToRoot(stat.getPath(), parentPath) + " dir ";
//get the children
FileStatus[] list = fs.listStatus(stat.getPath());
StringBuffer sbuff = new StringBuffer();
@@ -357,7 +399,7 @@ public class HadoopArchives implements T
toWrite = sbuff.toString();
}
else {
- toWrite += fs.makeQualified(stat.getPath()) + " file ";
+ toWrite += relPathToRoot(stat.getPath(), parentPath) + " file ";
}
srcWriter.append(new LongWritable(len), new
Text(toWrite));
@@ -403,6 +445,7 @@ public class HadoopArchives implements T
Path tmpOutputDir = null;
Path tmpOutput = null;
String partname = null;
+ Path rootPath = null;
FSDataOutputStream partStream = null;
FileSystem destFs = null;
byte[] buffer;
@@ -425,6 +468,12 @@ public class HadoopArchives implements T
// directory
partname = "part-" + partId;
tmpOutput = new Path(tmpOutputDir, partname);
+ rootPath = (conf.get(SRC_PARENT_LABEL, null) == null) ? null :
+ new Path(conf.get(SRC_PARENT_LABEL));
+ if (rootPath == null) {
+ throw new RuntimeException("Unable to read parent " +
+ "path for har from config");
+ }
try {
destFs = tmpOutput.getFileSystem(conf);
//this was a stale copy
@@ -450,16 +499,7 @@ public class HadoopArchives implements T
fsin.close();
}
}
-
- // the relative path of p. basically
- // getting rid of schema. Parsing and doing
- // string manipulation is not good - so
- // just use the path api to do it.
- private Path makeRelative(Path p) {
- Path retPath = new Path(p.toUri().getPath());
- return retPath;
- }
-
+
static class MapStat {
private String pathname;
private boolean isDir;
@@ -481,6 +521,20 @@ public class HadoopArchives implements T
}
}
}
+
+ /**
+ * get rid of / in the beginning of path
+ * @param p the path
+ * @return return path without /
+ */
+ private Path realPath(Path p, Path parent) {
+ Path rootPath = new Path(Path.SEPARATOR);
+ if (rootPath.compareTo(p) == 0) {
+ return parent;
+ }
+ return new Path(parent, new Path(p.toString().substring(1)));
+ }
+
// read files from the split input
// and write it onto the part files.
// also output hash(name) and string
@@ -491,10 +545,10 @@ public class HadoopArchives implements T
Reporter reporter) throws IOException {
String line = value.toString();
MapStat mstat = new MapStat(line);
- Path srcPath = new Path(mstat.pathname);
- String towrite = null;
- Path relPath = makeRelative(srcPath);
+ Path relPath = new Path(mstat.pathname);
int hash = HarFileSystem.getHarHash(relPath);
+ String towrite = null;
+ Path srcPath = realPath(relPath, rootPath);
long startPos = partStream.getPos();
if (mstat.isDir) {
towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
@@ -609,27 +663,26 @@ public class HadoopArchives implements T
outStream.close();
indexStream.close();
// try increasing the replication
- fs.setReplication(index, (short) 10);
- fs.setReplication(masterIndex, (short) 10);
+ fs.setReplication(index, (short) 5);
+ fs.setReplication(masterIndex, (short) 5);
}
}
/** the main driver for creating the archives
- * it takes at least two command line parameters. The src and the
- * dest. It does an lsr on the source paths.
+ * it takes at least three command line parameters. The parent path,
+ * The src and the dest. It does an lsr on the source paths.
* The mapper created archuves and the reducer creates
* the archive index.
*/
public int run(String[] args) throws Exception {
try {
+ Path parentPath = null;
List<Path> srcPaths = new ArrayList<Path>();
Path destPath = null;
- // check we were supposed to archive or
- // unarchive
String archiveName = null;
- if (args.length < 4) {
+ if (args.length < 5) {
System.out.println(usage);
throw new IOException("Invalid usage.");
}
@@ -642,17 +695,34 @@ public class HadoopArchives implements T
System.out.println(usage);
throw new IOException("Invalid name for archives. " + archiveName);
}
- for (int i = 2; i < args.length; i++) {
+ int i = 2;
+ //check to see if relative parent has been provided or not
+ //this is a required parameter.
+ if (! "-p".equals(args[i])) {
+ System.out.println(usage);
+ throw new IOException("Parent path not specified.");
+ }
+ parentPath = new Path(args[i+1]);
+ i+=2;
+ //read the rest of the paths
+ for (; i < args.length; i++) {
if (i == (args.length - 1)) {
destPath = new Path(args[i]);
}
else {
- srcPaths.add(new Path(args[i]));
+ Path argPath = new Path(args[i]);
+ if (argPath.isAbsolute()) {
+ System.out.println(usage);
+ throw new IOException("source path " + argPath +
+ " is not relative to "+ parentPath);
+ }
+ srcPaths.add(new Path(parentPath, argPath));
}
}
if (srcPaths.size() == 0) {
- System.out.println(usage);
- throw new IOException("Invalid Usage: No input sources specified.");
+ // assuming if the user does not specify path for sources
+ // the whole parent directory needs to be archived.
+ srcPaths.add(parentPath);
}
// do a glob on the srcPaths and then pass it on
List<Path> globPaths = new ArrayList<Path>();
@@ -663,7 +733,7 @@ public class HadoopArchives implements T
globPaths.add(fs.makeQualified(status.getPath()));
}
}
- archive(globPaths, archiveName, destPath);
+ archive(parentPath, globPaths, archiveName, destPath);
} catch(IOException ie) {
System.err.println(ie.getLocalizedMessage());
return -1;