You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/03/04 03:50:52 UTC

svn commit: r918828 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/FileInputFormat.java src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java

Author: dhruba
Date: Thu Mar  4 02:50:51 2010
New Revision: 918828

URL: http://svn.apache.org/viewvc?rev=918828&view=rev
Log:
MAPREDUCE-1501. FileInputFormat supports multi-level, recursive 
directory listing.  (Zheng Shao via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=918828&r1=918827&r2=918828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Mar  4 02:50:51 2010
@@ -215,6 +215,9 @@
     MAPREDUCE-1467. Add a --verbose flag to Sqoop.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-1501. FileInputFormat supports multi-level, recursive 
+    directory listing.  (Zheng Shao via dhruba)
+
   BUG FIXES
 
     MAPREDUCE-1258. Fix fair scheduler event log not logging job info.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=918828&r1=918827&r2=918828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Mar  4 02:50:51 2010
@@ -141,6 +141,30 @@
         ReflectionUtils.newInstance(filterClass, conf) : null;
   }
 
+  /**
+   * Add files in the input path recursively into the results.
+   * @param result
+   *          The List to store all files.
+   * @param fs
+   *          The FileSystem.
+   * @param path
+   *          The input path.
+   * @param inputFilter
+   *          The input filter that can be used to filter files/dirs. 
+   * @throws IOException
+   */
+  protected void addInputPathRecursively(List<FileStatus> result,
+      FileSystem fs, Path path, PathFilter inputFilter) 
+      throws IOException {
+    for(FileStatus stat: fs.listStatus(path, inputFilter)) {
+      if (stat.isDir()) {
+        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+      } else {
+        result.add(stat);
+      }
+    }          
+  }
+  
   /** List input directories.
    * Subclasses may override to, e.g., select only files matching a regular
    * expression. 
@@ -158,6 +182,9 @@
     // get tokens for all the required FileSystems..
     TokenCache.obtainTokensForNamenodes(dirs, job);
     
+    // Whether we need to recursive look into the directory structure
+    boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
+    
     List<FileStatus> result = new ArrayList<FileStatus>();
     List<IOException> errors = new ArrayList<IOException>();
     
@@ -183,7 +210,11 @@
           if (globStat.isDir()) {
             for(FileStatus stat: fs.listStatus(globStat.getPath(),
                 inputFilter)) {
-              result.add(stat);
+              if (recursive && stat.isDir()) {
+                addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+              } else {
+                result.add(stat);
+              }
             }          
           } else {
             result.add(globStat);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=918828&r1=918827&r2=918828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Mar  4 02:50:51 2010
@@ -18,14 +18,22 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
+import java.io.IOException;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
 
 public class TestFileInputFormat extends TestCase {
 
@@ -86,4 +94,93 @@
     }
   }
 
+  
+  final Path root = new Path("/TestFileInputFormat");
+  final Path file1 = new Path(root, "file1");
+  final Path dir1 = new Path(root, "dir1");
+  final Path file2 = new Path(dir1, "file2");
+
+  static final int BLOCKSIZE = 1024;
+  static final byte[] databuf = new byte[BLOCKSIZE];
+
+  private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+  
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.rack1.com"
+  };
+  
+  /** Dummy class to extend CombineFileInputFormat*/
+  private class DummyFileInputFormat extends FileInputFormat<Text, Text> {
+    @Override
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) throws IOException {
+      return null;
+    }
+  }
+
+  public void testMultiLevelInput() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      JobConf conf = new JobConf();
+      
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                 (dfs.getFileSystem()).getUri().getPort();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(dir1)) {
+        throw new IOException("Mkdirs failed to create " + root.toString());
+      }
+      writeFile(conf, file1, (short)1, 1);
+      writeFile(conf, file2, (short)1, 1);
+
+      // split it using a CombinedFile input format
+      DummyFileInputFormat inFormat = new DummyFileInputFormat();
+      inFormat.setInputPaths(conf, root);
+
+      // By default, we don't allow multi-level/recursive inputs
+      boolean exceptionThrown = false;
+      try {
+        InputSplit[] splits = inFormat.getSplits(conf, 1);
+      } catch (Exception e) {
+        exceptionThrown = true;
+      }
+      assertTrue("Exception should be thrown by default for scanning a "
+          + "directory with directories inside.", exceptionThrown);
+
+      // Enable multi-level/recursive inputs
+      conf.setBoolean("mapred.input.dir.recursive", true);
+      InputSplit[] splits = inFormat.getSplits(conf, 1);
+      assertEquals(splits.length, 2);
+      
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  static void writeFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            conf.getInt("io.file.buffer.size", 4096),
+                                            replication, (long)BLOCKSIZE);
+    for (int i = 0; i < numBlocks; i++) {
+      stm.write(databuf);
+    }
+    stm.close();
+    DFSTestUtil.waitReplication(fileSys, name, replication);
+  }
+
+  
 }