You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2014/03/20 03:48:48 UTC

svn commit: r1579516 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/...

Author: vinodkv
Date: Thu Mar 20 02:48:48 2014
New Revision: 1579516

URL: http://svn.apache.org/r1579516
Log:
MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block location calls in parallel. Contributed by Siddharth Seth.
svn merge --ignore-ancestry -c 1579515 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
      - copied unchanged from r1579515, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Mar 20 02:48:48 2014
@@ -57,6 +57,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to
     handle cross platform application submissions. (Jian He via vinodkv)
 
+    MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block
+    location calls in parallel. (Siddharth Seth via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -47,6 +47,9 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
+
 /** 
  * A base class for file-based {@link InputFormat}.
  * 
@@ -203,10 +206,7 @@ public abstract class FileInputFormat<K,
     
     // Whether we need to recursive look into the directory structure
     boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
-    
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    List<IOException> errors = new ArrayList<IOException>();
-    
+
     // creates a MultiPathFilter with the hiddenFileFilter and the
     // user provided one (if any).
     List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -217,6 +217,41 @@ public abstract class FileInputFormat<K,
     }
     PathFilter inputFilter = new MultiPathFilter(filters);
 
+    FileStatus[] result;
+    int numThreads = job
+        .getInt(
+            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
+            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
+    
+    Stopwatch sw = new Stopwatch().start();
+    if (numThreads == 1) {
+      List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 
+      result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
+    } else {
+      Iterable<FileStatus> locatedFiles = null;
+      try {
+        
+        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
+            job, dirs, recursive, inputFilter, false);
+        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while getting file statuses");
+      }
+      result = Iterables.toArray(locatedFiles, FileStatus.class);
+    }
+
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
+    }
+    LOG.info("Total input paths to process : " + result.length);
+    return result;
+  }
+  
+  private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
+      PathFilter inputFilter, boolean recursive) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
     for (Path p: dirs) {
       FileSystem fs = p.getFileSystem(job); 
       FileStatus[] matches = fs.globStatus(p, inputFilter);
@@ -246,12 +281,10 @@ public abstract class FileInputFormat<K,
         }
       }
     }
-
     if (!errors.isEmpty()) {
       throw new InvalidInputException(errors);
     }
-    LOG.info("Total input paths to process : " + result.size()); 
-    return result.toArray(new FileStatus[result.size()]);
+    return result;
   }
 
   /**
@@ -267,6 +300,7 @@ public abstract class FileInputFormat<K,
    * they're too big.*/ 
   public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
+    Stopwatch sw = new Stopwatch().start();
     FileStatus[] files = listStatus(job);
     
     // Save the number of input files for metrics/loadgen
@@ -325,7 +359,11 @@ public abstract class FileInputFormat<K,
         splits.add(makeSplit(path, 0, length, new String[0]));
       }
     }
-    LOG.debug("Total # of splits: " + splits.size());
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+          + ", TimeTaken: " + sw.elapsedMillis());
+    }
     return splits.toArray(new FileSplit[splits.size()]);
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
 /** 
  * A base class for file-based {@link InputFormat}s.
  * 
@@ -68,6 +72,9 @@ public abstract class FileInputFormat<K,
     "mapreduce.input.fileinputformat.numinputfiles";
   public static final String INPUT_DIR_RECURSIVE =
     "mapreduce.input.fileinputformat.input.dir.recursive";
+  public static final String LIST_STATUS_NUM_THREADS =
+      "mapreduce.input.fileinputformat.list-status.num-threads";
+  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
 
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
@@ -225,7 +232,6 @@ public abstract class FileInputFormat<K,
    */
   protected List<FileStatus> listStatus(JobContext job
                                         ) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
     Path[] dirs = getInputPaths(job);
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
@@ -237,9 +243,7 @@ public abstract class FileInputFormat<K,
 
     // Whether we need to recursive look into the directory structure
     boolean recursive = getInputDirRecursive(job);
-    
-    List<IOException> errors = new ArrayList<IOException>();
-    
+
     // creates a MultiPathFilter with the hiddenFileFilter and the
     // user provided one (if any).
     List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -250,6 +254,37 @@ public abstract class FileInputFormat<K,
     }
     PathFilter inputFilter = new MultiPathFilter(filters);
     
+    List<FileStatus> result = null;
+
+    int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
+        DEFAULT_LIST_STATUS_NUM_THREADS);
+    Stopwatch sw = new Stopwatch().start();
+    if (numThreads == 1) {
+      result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
+    } else {
+      Iterable<FileStatus> locatedFiles = null;
+      try {
+        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
+            job.getConfiguration(), dirs, recursive, inputFilter, true);
+        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while getting file statuses");
+      }
+      result = Lists.newArrayList(locatedFiles);
+    }
+    
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
+    }
+    LOG.info("Total input paths to process : " + result.size()); 
+    return result;
+  }
+
+  private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
+      PathFilter inputFilter, boolean recursive) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
     for (int i=0; i < dirs.length; ++i) {
       Path p = dirs[i];
       FileSystem fs = p.getFileSystem(job.getConfiguration()); 
@@ -284,7 +319,6 @@ public abstract class FileInputFormat<K,
     if (!errors.isEmpty()) {
       throw new InvalidInputException(errors);
     }
-    LOG.info("Total input paths to process : " + result.size()); 
     return result;
   }
   
@@ -332,6 +366,7 @@ public abstract class FileInputFormat<K,
    * @throws IOException
    */
   public List<InputSplit> getSplits(JobContext job) throws IOException {
+    Stopwatch sw = new Stopwatch().start();
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
 
@@ -376,7 +411,11 @@ public abstract class FileInputFormat<K,
     }
     // Save the number of input files for metrics/loadgen
     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
-    LOG.debug("Total # of splits: " + splits.size());
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+          + ", TimeTaken: " + sw.elapsedMillis());
+    }
     return splits;
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Mar 20 02:48:48 2014
@@ -695,6 +695,15 @@
 </property>
 
 <property>
+  <name>mapreduce.input.fileinputformat.list-status.num-threads</name>
+  <value>1</value>
+  <description>The number of threads to use to list and fetch block locations
+  for the specified input paths. Note: multiple threads should not be used
+  if a custom non thread-safe path filter is used.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.jobtracker.maxtasks.perjob</name>
   <value>-1</value>
   <description>The maximum number of tasks for a single job.

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -19,7 +19,12 @@ package org.apache.hadoop.mapred;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,15 +34,58 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
-public class TestFileInputFormat {
+import com.google.common.collect.Lists;
 
+@RunWith(value = Parameterized.class)
+public class TestFileInputFormat {
+  
+  private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+  
+  private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
+  private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
+  
+  private static FileSystem localFs;
+  
+  private int numThreads;
+  
+  public TestFileInputFormat(int numThreads) {
+    this.numThreads = numThreads;
+    LOG.info("Running with numThreads: " + numThreads);
+  }
+  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { 1 }, { 5 }};
+    return Arrays.asList(data);
+  }
+  
+  @Before
+  public void setup() throws IOException {
+    LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
+    localFs = FileSystem.getLocal(new Configuration());
+    localFs.delete(TEST_ROOT_DIR, true);
+    localFs.mkdirs(TEST_ROOT_DIR);
+  }
+  
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(TEST_ROOT_DIR, true);
+  }
+  
   @Test
   public void testListLocatedStatus() throws Exception {
     Configuration conf = getConfiguration();
     conf.setBoolean("fs.test.impl.disable.cache", false);
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
         "test:///a1/a2");
     MockFileSystem mockFs =
@@ -51,6 +99,82 @@ public class TestFileInputFormat {
     Assert.assertEquals("Input splits are not correct", 2, splits.length);
     Assert.assertEquals("listLocatedStatuss calls",
         1, mockFs.numListLocatedStatusCalls);
+    FileSystem.closeAll();
+  }
+  
+  @Test
+  public void testListStatusSimple() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestSimple(conf, localFs);
+
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    FileStatus[] statuses = fif.listStatus(jobConf);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+            localFs);
+  }
+
+  @Test
+  public void testListStatusNestedRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestNestedRecursive(conf, localFs);
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    FileStatus[] statuses = fif.listStatus(jobConf);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+            localFs);
+  }
+
+  @Test
+  public void testListStatusNestedNonRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestNestedNonRecursive(conf, localFs);
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    FileStatus[] statuses = fif.listStatus(jobConf);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+            localFs);
+  }
+
+  @Test
+  public void testListStatusErrorOnNonExistantDir() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestErrorOnNonExistantDir(conf, localFs);
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    try {
+      fif.listStatus(jobConf);
+      Assert.fail("Expecting an IOException for a missing Input path");
+    } catch (IOException e) {
+      Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
+      expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
+      Assert.assertTrue(e instanceof InvalidInputException);
+      Assert.assertEquals(
+          "Input path does not exist: " + expectedExceptionPath.toString(),
+          e.getMessage());
+    }
   }
 
   private Configuration getConfiguration() {

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -19,10 +19,17 @@ package org.apache.hadoop.mapreduce.lib.
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -34,55 +41,90 @@ import org.apache.hadoop.fs.RawLocalFile
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
+@RunWith(value = Parameterized.class)
 public class TestFileInputFormat {
+  
+  private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+  
+  private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
+  private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
+  
+  private static FileSystem localFs;
+  
+  private int numThreads;
+  
+  public TestFileInputFormat(int numThreads) {
+    this.numThreads = numThreads;
+    LOG.info("Running with numThreads: " + numThreads);
+  }
+  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { 1 }, { 5 }};
+    return Arrays.asList(data);
+  }
+  
+  @Before
+  public void setup() throws IOException {
+    LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
+    localFs = FileSystem.getLocal(new Configuration());
+    localFs.delete(TEST_ROOT_DIR, true);
+    localFs.mkdirs(TEST_ROOT_DIR);
+  }
+  
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(TEST_ROOT_DIR, true);
+  }
 
   @Test
   public void testNumInputFilesRecursively() throws Exception {
     Configuration conf = getConfiguration();
     conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     Job job = Job.getInstance(conf);
     FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
     List<InputSplit> splits = fileInputFormat.getSplits(job);
     Assert.assertEquals("Input splits are not correct", 3, splits.size());
-    Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
-        .toString());
-    
+    verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
+        "test:/a1/file1"), splits);
+
     // Using the deprecated configuration
     conf = getConfiguration();
     conf.set("mapred.input.dir.recursive", "true");
     job = Job.getInstance(conf);
     splits = fileInputFormat.getSplits(job);
-    Assert.assertEquals("Input splits are not correct", 3, splits.size());
-    Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
-        .toString());
+    verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
+        "test:/a1/file1"), splits);
   }
 
   @Test
   public void testNumInputFilesWithoutRecursively() throws Exception {
     Configuration conf = getConfiguration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     Job job = Job.getInstance(conf);
     FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
     List<InputSplit> splits = fileInputFormat.getSplits(job);
     Assert.assertEquals("Input splits are not correct", 2, splits.size());
-    Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath()
-        .toString());
-    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath()
-        .toString());
+    verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits);
   }
 
   @Test
   public void testListLocatedStatus() throws Exception {
     Configuration conf = getConfiguration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     conf.setBoolean("fs.test.impl.disable.cache", false);
     conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
     MockFileSystem mockFs =
@@ -95,8 +137,226 @@ public class TestFileInputFormat {
     Assert.assertEquals("Input splits are not correct", 2, splits.size());
     Assert.assertEquals("listLocatedStatuss calls",
         1, mockFs.numListLocatedStatusCalls);
+    FileSystem.closeAll();
+  }
+
+  @Test
+  public void testListStatusSimple() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = configureTestSimple(conf, localFs);
+    
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    List<FileStatus> statuses = fif.listStatus(job);
+
+    verifyFileStatuses(expectedPaths, statuses, localFs);
+  }
+
+  @Test
+  public void testListStatusNestedRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs);
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    List<FileStatus> statuses = fif.listStatus(job);
+
+    verifyFileStatuses(expectedPaths, statuses, localFs);
+  }
+
+
+  @Test
+  public void testListStatusNestedNonRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs);
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    List<FileStatus> statuses = fif.listStatus(job);
+
+    verifyFileStatuses(expectedPaths, statuses, localFs);
+  }
+
+  @Test
+  public void testListStatusErrorOnNonExistantDir() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    configureTestErrorOnNonExistantDir(conf, localFs);
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    try {
+      fif.listStatus(job);
+      Assert.fail("Expecting an IOException for a missing Input path");
+    } catch (IOException e) {
+      Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
+      expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
+      Assert.assertTrue(e instanceof InvalidInputException);
+      Assert.assertEquals(
+          "Input path does not exist: " + expectedExceptionPath.toString(),
+          e.getMessage());
+    }
+  }
+
+  public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs)
+      throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    Path base2 = new Path(TEST_ROOT_DIR, "input2");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
+    localFs.mkdirs(base1);
+    localFs.mkdirs(base2);
+
+    Path in1File1 = new Path(base1, "file1");
+    Path in1File2 = new Path(base1, "file2");
+    localFs.createNewFile(in1File1);
+    localFs.createNewFile(in1File2);
+
+    Path in2File1 = new Path(base2, "file1");
+    Path in2File2 = new Path(base2, "file2");
+    localFs.createNewFile(in2File1);
+    localFs.createNewFile(in2File2);
+    List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1,
+        in2File2);
+    return expectedPaths;
+  }
+
+  public static List<Path> configureTestNestedRecursive(Configuration conf,
+      FileSystem localFs) throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1).toString());
+    conf.setBoolean(
+        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+        true);
+    localFs.mkdirs(base1);
+
+    Path inDir1 = new Path(base1, "dir1");
+    Path inDir2 = new Path(base1, "dir2");
+    Path inFile1 = new Path(base1, "file1");
+
+    Path dir1File1 = new Path(inDir1, "file1");
+    Path dir1File2 = new Path(inDir1, "file2");
+
+    Path dir2File1 = new Path(inDir2, "file1");
+    Path dir2File2 = new Path(inDir2, "file2");
+
+    localFs.mkdirs(inDir1);
+    localFs.mkdirs(inDir2);
+
+    localFs.createNewFile(inFile1);
+    localFs.createNewFile(dir1File1);
+    localFs.createNewFile(dir1File2);
+    localFs.createNewFile(dir2File1);
+    localFs.createNewFile(dir2File2);
+
+    List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1,
+        dir1File2, dir2File1, dir2File2);
+    return expectedPaths;
+  }
+
+  public static List<Path> configureTestNestedNonRecursive(Configuration conf,
+      FileSystem localFs) throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1).toString());
+    conf.setBoolean(
+        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+        false);
+    localFs.mkdirs(base1);
+
+    Path inDir1 = new Path(base1, "dir1");
+    Path inDir2 = new Path(base1, "dir2");
+    Path inFile1 = new Path(base1, "file1");
+
+    Path dir1File1 = new Path(inDir1, "file1");
+    Path dir1File2 = new Path(inDir1, "file2");
+
+    Path dir2File1 = new Path(inDir2, "file1");
+    Path dir2File2 = new Path(inDir2, "file2");
+
+    localFs.mkdirs(inDir1);
+    localFs.mkdirs(inDir2);
+
+    localFs.createNewFile(inFile1);
+    localFs.createNewFile(dir1File1);
+    localFs.createNewFile(dir1File2);
+    localFs.createNewFile(dir2File1);
+    localFs.createNewFile(dir2File2);
+
+    List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2);
+    return expectedPaths;
   }
 
+  public static List<Path> configureTestErrorOnNonExistantDir(Configuration conf,
+      FileSystem localFs) throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    Path base2 = new Path(TEST_ROOT_DIR, "input2");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
+    conf.setBoolean(
+        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+        true);
+    localFs.mkdirs(base1);
+
+    Path inFile1 = new Path(base1, "file1");
+    Path inFile2 = new Path(base1, "file2");
+
+    localFs.createNewFile(inFile1);
+    localFs.createNewFile(inFile2);
+
+    List<Path> expectedPaths = Lists.newArrayList();
+    return expectedPaths;
+  }
+
+  public static void verifyFileStatuses(List<Path> expectedPaths,
+      List<FileStatus> fetchedStatuses, final FileSystem localFs) {
+    Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size());
+
+    Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths,
+        new Function<Path, Path>() {
+          @Override
+          public Path apply(Path input) {
+            return localFs.makeQualified(input);
+          }
+        });
+
+    Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths);
+    for (FileStatus fileStatus : fetchedStatuses) {
+      if (!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) {
+        Assert.fail("Found extra fetched status: " + fileStatus.getPath());
+      }
+    }
+    Assert.assertEquals(
+        "Not all expectedPaths matched: " + expectedPathSet.toString(), 0,
+        expectedPathSet.size());
+  }
+
+
+  private void verifySplits(List<String> expected, List<InputSplit> splits) {
+    Iterable<String> pathsFromSplits = Iterables.transform(splits,
+        new Function<InputSplit, String>() {
+          @Override
+          public String apply(@Nullable InputSplit input) {
+            return ((FileSplit) input).getPath().toString();
+          }
+        });
+
+    Set<String> expectedSet = Sets.newHashSet(expected);
+    for (String splitPathString : pathsFromSplits) {
+      if (!expectedSet.remove(splitPathString)) {
+        Assert.fail("Found extra split: " + splitPathString);
+      }
+    }
+    Assert.assertEquals(
+        "Not all expectedPaths matched: " + expectedSet.toString(), 0,
+        expectedSet.size());
+  }
+  
   private Configuration getConfiguration() {
     Configuration conf = new Configuration();
     conf.set("fs.test.impl.disable.cache", "true");