You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/03/04 03:50:52 UTC
svn commit: r918828 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/FileInputFormat.java
src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
Author: dhruba
Date: Thu Mar 4 02:50:51 2010
New Revision: 918828
URL: http://svn.apache.org/viewvc?rev=918828&view=rev
Log:
MAPREDUCE-1501. FileInputFormat supports multi-level, recursive
directory listing. (Zheng Shao via dhruba)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=918828&r1=918827&r2=918828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Mar 4 02:50:51 2010
@@ -215,6 +215,9 @@
MAPREDUCE-1467. Add a --verbose flag to Sqoop.
(Aaron Kimball via tomwhite)
+ MAPREDUCE-1501. FileInputFormat supports multi-level, recursive
+ directory listing. (Zheng Shao via dhruba)
+
BUG FIXES
MAPREDUCE-1258. Fix fair scheduler event log not logging job info.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=918828&r1=918827&r2=918828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Mar 4 02:50:51 2010
@@ -141,6 +141,30 @@
ReflectionUtils.newInstance(filterClass, conf) : null;
}
+ /**
+ * Add files in the input path recursively into the results.
+ * @param result
+ * The List to store all files.
+ * @param fs
+ * The FileSystem.
+ * @param path
+ * The input path.
+ * @param inputFilter
+ * The input filter that can be used to filter files/dirs.
+ * @throws IOException
+ */
+ protected void addInputPathRecursively(List<FileStatus> result,
+ FileSystem fs, Path path, PathFilter inputFilter)
+ throws IOException {
+ for(FileStatus stat: fs.listStatus(path, inputFilter)) {
+ if (stat.isDir()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
@@ -158,6 +182,9 @@
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(dirs, job);
+ // Whether we need to recursive look into the directory structure
+ boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
+
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
@@ -183,7 +210,11 @@
if (globStat.isDir()) {
for(FileStatus stat: fs.listStatus(globStat.getPath(),
inputFilter)) {
- result.add(stat);
+ if (recursive && stat.isDir()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
}
} else {
result.add(globStat);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=918828&r1=918827&r2=918828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Mar 4 02:50:51 2010
@@ -18,14 +18,22 @@
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
+import java.io.IOException;
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
public class TestFileInputFormat extends TestCase {
@@ -86,4 +94,93 @@
}
}
+
+ final Path root = new Path("/TestFileInputFormat");
+ final Path file1 = new Path(root, "file1");
+ final Path dir1 = new Path(root, "dir1");
+ final Path file2 = new Path(dir1, "file2");
+
+ static final int BLOCKSIZE = 1024;
+ static final byte[] databuf = new byte[BLOCKSIZE];
+
+ private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+
+ private static final String rack1[] = new String[] {
+ "/r1"
+ };
+ private static final String hosts1[] = new String[] {
+ "host1.rack1.com"
+ };
+
+ /** Dummy class to extend CombineFileInputFormat*/
+ private class DummyFileInputFormat extends FileInputFormat<Text, Text> {
+ @Override
+ public RecordReader<Text, Text> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ return null;
+ }
+ }
+
+ public void testMultiLevelInput() throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ try {
+ JobConf conf = new JobConf();
+
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+ dfs.waitActive();
+
+ namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+ (dfs.getFileSystem()).getUri().getPort();
+
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(dir1)) {
+ throw new IOException("Mkdirs failed to create " + root.toString());
+ }
+ writeFile(conf, file1, (short)1, 1);
+ writeFile(conf, file2, (short)1, 1);
+
+ // split it using a CombinedFile input format
+ DummyFileInputFormat inFormat = new DummyFileInputFormat();
+ inFormat.setInputPaths(conf, root);
+
+ // By default, we don't allow multi-level/recursive inputs
+ boolean exceptionThrown = false;
+ try {
+ InputSplit[] splits = inFormat.getSplits(conf, 1);
+ } catch (Exception e) {
+ exceptionThrown = true;
+ }
+ assertTrue("Exception should be thrown by default for scanning a "
+ + "directory with directories inside.", exceptionThrown);
+
+ // Enable multi-level/recursive inputs
+ conf.setBoolean("mapred.input.dir.recursive", true);
+ InputSplit[] splits = inFormat.getSplits(conf, 1);
+ assertEquals(splits.length, 2);
+
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
+ static void writeFile(Configuration conf, Path name,
+ short replication, int numBlocks) throws IOException {
+ FileSystem fileSys = FileSystem.get(conf);
+
+ FSDataOutputStream stm = fileSys.create(name, true,
+ conf.getInt("io.file.buffer.size", 4096),
+ replication, (long)BLOCKSIZE);
+ for (int i = 0; i < numBlocks; i++) {
+ stm.write(databuf);
+ }
+ stm.close();
+ DFSTestUtil.waitReplication(fileSys, name, replication);
+ }
+
+
}