You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2014/03/20 03:48:48 UTC
svn commit: r1579516 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/...
Author: vinodkv
Date: Thu Mar 20 02:48:48 2014
New Revision: 1579516
URL: http://svn.apache.org/r1579516
Log:
MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block location calls in parallel. Contributed by Siddharth Seth.
svn merge --ignore-ancestry -c 1579515 ../../trunk/
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
- copied unchanged from r1579515, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Mar 20 02:48:48 2014
@@ -57,6 +57,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to
handle cross platform application submissions. (Jian He via vinodkv)
+ MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block
+ location calls in parallel. (Siddharth Seth via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -47,6 +47,9 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
+
/**
* A base class for file-based {@link InputFormat}.
*
@@ -203,10 +206,7 @@ public abstract class FileInputFormat<K,
// Whether we need to recursive look into the directory structure
boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
-
- List<FileStatus> result = new ArrayList<FileStatus>();
- List<IOException> errors = new ArrayList<IOException>();
-
+
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -217,6 +217,41 @@ public abstract class FileInputFormat<K,
}
PathFilter inputFilter = new MultiPathFilter(filters);
+ FileStatus[] result;
+ int numThreads = job
+ .getInt(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
+
+ Stopwatch sw = new Stopwatch().start();
+ if (numThreads == 1) {
+ List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive);
+ result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
+ } else {
+ Iterable<FileStatus> locatedFiles = null;
+ try {
+
+ LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
+ job, dirs, recursive, inputFilter, false);
+ locatedFiles = locatedFileStatusFetcher.getFileStatuses();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while getting file statuses");
+ }
+ result = Iterables.toArray(locatedFiles, FileStatus.class);
+ }
+
+ sw.stop();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
+ }
+ LOG.info("Total input paths to process : " + result.length);
+ return result;
+ }
+
+ private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
+ PathFilter inputFilter, boolean recursive) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ List<IOException> errors = new ArrayList<IOException>();
for (Path p: dirs) {
FileSystem fs = p.getFileSystem(job);
FileStatus[] matches = fs.globStatus(p, inputFilter);
@@ -246,12 +281,10 @@ public abstract class FileInputFormat<K,
}
}
}
-
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
- LOG.info("Total input paths to process : " + result.size());
- return result.toArray(new FileStatus[result.size()]);
+ return result;
}
/**
@@ -267,6 +300,7 @@ public abstract class FileInputFormat<K,
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
+ Stopwatch sw = new Stopwatch().start();
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
@@ -325,7 +359,11 @@ public abstract class FileInputFormat<K,
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
- LOG.debug("Total # of splits: " + splits.size());
+ sw.stop();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ + ", TimeTaken: " + sw.elapsedMillis());
+ }
return splits.toArray(new FileSplit[splits.size()]);
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
/**
* A base class for file-based {@link InputFormat}s.
*
@@ -68,6 +72,9 @@ public abstract class FileInputFormat<K,
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
+ public static final String LIST_STATUS_NUM_THREADS =
+ "mapreduce.input.fileinputformat.list-status.num-threads";
+ public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
@@ -225,7 +232,6 @@ public abstract class FileInputFormat<K,
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
@@ -237,9 +243,7 @@ public abstract class FileInputFormat<K,
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
-
- List<IOException> errors = new ArrayList<IOException>();
-
+
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -250,6 +254,37 @@ public abstract class FileInputFormat<K,
}
PathFilter inputFilter = new MultiPathFilter(filters);
+ List<FileStatus> result = null;
+
+ int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
+ DEFAULT_LIST_STATUS_NUM_THREADS);
+ Stopwatch sw = new Stopwatch().start();
+ if (numThreads == 1) {
+ result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
+ } else {
+ Iterable<FileStatus> locatedFiles = null;
+ try {
+ LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
+ job.getConfiguration(), dirs, recursive, inputFilter, true);
+ locatedFiles = locatedFileStatusFetcher.getFileStatuses();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while getting file statuses");
+ }
+ result = Lists.newArrayList(locatedFiles);
+ }
+
+ sw.stop();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
+ PathFilter inputFilter, boolean recursive) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ List<IOException> errors = new ArrayList<IOException>();
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
@@ -284,7 +319,6 @@ public abstract class FileInputFormat<K,
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
- LOG.info("Total input paths to process : " + result.size());
return result;
}
@@ -332,6 +366,7 @@ public abstract class FileInputFormat<K,
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
+ Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
@@ -376,7 +411,11 @@ public abstract class FileInputFormat<K,
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
- LOG.debug("Total # of splits: " + splits.size());
+ sw.stop();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ + ", TimeTaken: " + sw.elapsedMillis());
+ }
return splits;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Mar 20 02:48:48 2014
@@ -695,6 +695,15 @@
</property>
<property>
+ <name>mapreduce.input.fileinputformat.list-status.num-threads</name>
+ <value>1</value>
+ <description>The number of threads to use to list and fetch block locations
+ for the specified input paths. Note: multiple threads should not be used
+ if a custom non thread-safe path filter is used.
+ </description>
+</property>
+
+<property>
<name>mapreduce.jobtracker.maxtasks.perjob</name>
<value>-1</value>
<description>The maximum number of tasks for a single job.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -19,7 +19,12 @@ package org.apache.hadoop.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -29,15 +34,58 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
-public class TestFileInputFormat {
+import com.google.common.collect.Lists;
+@RunWith(value = Parameterized.class)
+public class TestFileInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+
+ private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
+ private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
+
+ private static FileSystem localFs;
+
+ private int numThreads;
+
+ public TestFileInputFormat(int numThreads) {
+ this.numThreads = numThreads;
+ LOG.info("Running with numThreads: " + numThreads);
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Object[][] data = new Object[][] { { 1 }, { 5 }};
+ return Arrays.asList(data);
+ }
+
+ @Before
+ public void setup() throws IOException {
+ LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
+ localFs = FileSystem.getLocal(new Configuration());
+ localFs.delete(TEST_ROOT_DIR, true);
+ localFs.mkdirs(TEST_ROOT_DIR);
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ localFs.delete(TEST_ROOT_DIR, true);
+ }
+
@Test
public void testListLocatedStatus() throws Exception {
Configuration conf = getConfiguration();
conf.setBoolean("fs.test.impl.disable.cache", false);
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
MockFileSystem mockFs =
@@ -51,6 +99,82 @@ public class TestFileInputFormat {
Assert.assertEquals("Input splits are not correct", 2, splits.length);
Assert.assertEquals("listLocatedStatuss calls",
1, mockFs.numListLocatedStatusCalls);
+ FileSystem.closeAll();
+ }
+
+ @Test
+ public void testListStatusSimple() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .configureTestSimple(conf, localFs);
+
+ JobConf jobConf = new JobConf(conf);
+ TextInputFormat fif = new TextInputFormat();
+ fif.configure(jobConf);
+ FileStatus[] statuses = fif.listStatus(jobConf);
+
+ org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+ localFs);
+ }
+
+ @Test
+ public void testListStatusNestedRecursive() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .configureTestNestedRecursive(conf, localFs);
+ JobConf jobConf = new JobConf(conf);
+ TextInputFormat fif = new TextInputFormat();
+ fif.configure(jobConf);
+ FileStatus[] statuses = fif.listStatus(jobConf);
+
+ org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+ localFs);
+ }
+
+ @Test
+ public void testListStatusNestedNonRecursive() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .configureTestNestedNonRecursive(conf, localFs);
+ JobConf jobConf = new JobConf(conf);
+ TextInputFormat fif = new TextInputFormat();
+ fif.configure(jobConf);
+ FileStatus[] statuses = fif.listStatus(jobConf);
+
+ org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+ localFs);
+ }
+
+ @Test
+ public void testListStatusErrorOnNonExistantDir() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+ .configureTestErrorOnNonExistantDir(conf, localFs);
+ JobConf jobConf = new JobConf(conf);
+ TextInputFormat fif = new TextInputFormat();
+ fif.configure(jobConf);
+ try {
+ fif.listStatus(jobConf);
+ Assert.fail("Expecting an IOException for a missing Input path");
+ } catch (IOException e) {
+ Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
+ expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
+ Assert.assertTrue(e instanceof InvalidInputException);
+ Assert.assertEquals(
+ "Input path does not exist: " + expectedExceptionPath.toString(),
+ e.getMessage());
+ }
}
private Configuration getConfiguration() {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1579516&r1=1579515&r2=1579516&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Thu Mar 20 02:48:48 2014
@@ -19,10 +19,17 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -34,55 +41,90 @@ import org.apache.hadoop.fs.RawLocalFile
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+@RunWith(value = Parameterized.class)
public class TestFileInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+
+ private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
+ private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
+
+ private static FileSystem localFs;
+
+ private int numThreads;
+
+ public TestFileInputFormat(int numThreads) {
+ this.numThreads = numThreads;
+ LOG.info("Running with numThreads: " + numThreads);
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Object[][] data = new Object[][] { { 1 }, { 5 }};
+ return Arrays.asList(data);
+ }
+
+ @Before
+ public void setup() throws IOException {
+ LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
+ localFs = FileSystem.getLocal(new Configuration());
+ localFs.delete(TEST_ROOT_DIR, true);
+ localFs.mkdirs(TEST_ROOT_DIR);
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ localFs.delete(TEST_ROOT_DIR, true);
+ }
@Test
public void testNumInputFilesRecursively() throws Exception {
Configuration conf = getConfiguration();
conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 3, splits.size());
- Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
- .getPath().toString());
- Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
- .getPath().toString());
- Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
- .toString());
-
+ verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
+ "test:/a1/file1"), splits);
+
// Using the deprecated configuration
conf = getConfiguration();
conf.set("mapred.input.dir.recursive", "true");
job = Job.getInstance(conf);
splits = fileInputFormat.getSplits(job);
- Assert.assertEquals("Input splits are not correct", 3, splits.size());
- Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
- .getPath().toString());
- Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
- .getPath().toString());
- Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
- .toString());
+ verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
+ "test:/a1/file1"), splits);
}
@Test
public void testNumInputFilesWithoutRecursively() throws Exception {
Configuration conf = getConfiguration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 2, splits.size());
- Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath()
- .toString());
- Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath()
- .toString());
+ verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits);
}
@Test
public void testListLocatedStatus() throws Exception {
Configuration conf = getConfiguration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
conf.setBoolean("fs.test.impl.disable.cache", false);
conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
MockFileSystem mockFs =
@@ -95,8 +137,226 @@ public class TestFileInputFormat {
Assert.assertEquals("Input splits are not correct", 2, splits.size());
Assert.assertEquals("listLocatedStatuss calls",
1, mockFs.numListLocatedStatusCalls);
+ FileSystem.closeAll();
+ }
+
+ @Test
+ public void testListStatusSimple() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ List<Path> expectedPaths = configureTestSimple(conf, localFs);
+
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fif = new TextInputFormat();
+ List<FileStatus> statuses = fif.listStatus(job);
+
+ verifyFileStatuses(expectedPaths, statuses, localFs);
+ }
+
+ @Test
+ public void testListStatusNestedRecursive() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fif = new TextInputFormat();
+ List<FileStatus> statuses = fif.listStatus(job);
+
+ verifyFileStatuses(expectedPaths, statuses, localFs);
+ }
+
+
+ @Test
+ public void testListStatusNestedNonRecursive() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fif = new TextInputFormat();
+ List<FileStatus> statuses = fif.listStatus(job);
+
+ verifyFileStatuses(expectedPaths, statuses, localFs);
+ }
+
+ @Test
+ public void testListStatusErrorOnNonExistantDir() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+ configureTestErrorOnNonExistantDir(conf, localFs);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fif = new TextInputFormat();
+ try {
+ fif.listStatus(job);
+ Assert.fail("Expecting an IOException for a missing Input path");
+ } catch (IOException e) {
+ Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
+ expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
+ Assert.assertTrue(e instanceof InvalidInputException);
+ Assert.assertEquals(
+ "Input path does not exist: " + expectedExceptionPath.toString(),
+ e.getMessage());
+ }
+ }
+
+ public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs)
+ throws IOException {
+ Path base1 = new Path(TEST_ROOT_DIR, "input1");
+ Path base2 = new Path(TEST_ROOT_DIR, "input2");
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
+ localFs.mkdirs(base1);
+ localFs.mkdirs(base2);
+
+ Path in1File1 = new Path(base1, "file1");
+ Path in1File2 = new Path(base1, "file2");
+ localFs.createNewFile(in1File1);
+ localFs.createNewFile(in1File2);
+
+ Path in2File1 = new Path(base2, "file1");
+ Path in2File2 = new Path(base2, "file2");
+ localFs.createNewFile(in2File1);
+ localFs.createNewFile(in2File2);
+ List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1,
+ in2File2);
+ return expectedPaths;
+ }
+
+ public static List<Path> configureTestNestedRecursive(Configuration conf,
+ FileSystem localFs) throws IOException {
+ Path base1 = new Path(TEST_ROOT_DIR, "input1");
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ localFs.makeQualified(base1).toString());
+ conf.setBoolean(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+ true);
+ localFs.mkdirs(base1);
+
+ Path inDir1 = new Path(base1, "dir1");
+ Path inDir2 = new Path(base1, "dir2");
+ Path inFile1 = new Path(base1, "file1");
+
+ Path dir1File1 = new Path(inDir1, "file1");
+ Path dir1File2 = new Path(inDir1, "file2");
+
+ Path dir2File1 = new Path(inDir2, "file1");
+ Path dir2File2 = new Path(inDir2, "file2");
+
+ localFs.mkdirs(inDir1);
+ localFs.mkdirs(inDir2);
+
+ localFs.createNewFile(inFile1);
+ localFs.createNewFile(dir1File1);
+ localFs.createNewFile(dir1File2);
+ localFs.createNewFile(dir2File1);
+ localFs.createNewFile(dir2File2);
+
+ List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1,
+ dir1File2, dir2File1, dir2File2);
+ return expectedPaths;
+ }
+
+ public static List<Path> configureTestNestedNonRecursive(Configuration conf,
+ FileSystem localFs) throws IOException {
+ Path base1 = new Path(TEST_ROOT_DIR, "input1");
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ localFs.makeQualified(base1).toString());
+ conf.setBoolean(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+ false);
+ localFs.mkdirs(base1);
+
+ Path inDir1 = new Path(base1, "dir1");
+ Path inDir2 = new Path(base1, "dir2");
+ Path inFile1 = new Path(base1, "file1");
+
+ Path dir1File1 = new Path(inDir1, "file1");
+ Path dir1File2 = new Path(inDir1, "file2");
+
+ Path dir2File1 = new Path(inDir2, "file1");
+ Path dir2File2 = new Path(inDir2, "file2");
+
+ localFs.mkdirs(inDir1);
+ localFs.mkdirs(inDir2);
+
+ localFs.createNewFile(inFile1);
+ localFs.createNewFile(dir1File1);
+ localFs.createNewFile(dir1File2);
+ localFs.createNewFile(dir2File1);
+ localFs.createNewFile(dir2File2);
+
+ List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2);
+ return expectedPaths;
}
+ public static List<Path> configureTestErrorOnNonExistantDir(Configuration conf,
+ FileSystem localFs) throws IOException {
+ Path base1 = new Path(TEST_ROOT_DIR, "input1");
+ Path base2 = new Path(TEST_ROOT_DIR, "input2");
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
+ conf.setBoolean(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+ true);
+ localFs.mkdirs(base1);
+
+ Path inFile1 = new Path(base1, "file1");
+ Path inFile2 = new Path(base1, "file2");
+
+ localFs.createNewFile(inFile1);
+ localFs.createNewFile(inFile2);
+
+ List<Path> expectedPaths = Lists.newArrayList();
+ return expectedPaths;
+ }
+
+ public static void verifyFileStatuses(List<Path> expectedPaths,
+ List<FileStatus> fetchedStatuses, final FileSystem localFs) {
+ Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size());
+
+ Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths,
+ new Function<Path, Path>() {
+ @Override
+ public Path apply(Path input) {
+ return localFs.makeQualified(input);
+ }
+ });
+
+ Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths);
+ for (FileStatus fileStatus : fetchedStatuses) {
+ if (!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) {
+ Assert.fail("Found extra fetched status: " + fileStatus.getPath());
+ }
+ }
+ Assert.assertEquals(
+ "Not all expectedPaths matched: " + expectedPathSet.toString(), 0,
+ expectedPathSet.size());
+ }
+
+
+ private void verifySplits(List<String> expected, List<InputSplit> splits) {
+ Iterable<String> pathsFromSplits = Iterables.transform(splits,
+ new Function<InputSplit, String>() {
+ @Override
+ public String apply(@Nullable InputSplit input) {
+ return ((FileSplit) input).getPath().toString();
+ }
+ });
+
+ Set<String> expectedSet = Sets.newHashSet(expected);
+ for (String splitPathString : pathsFromSplits) {
+ if (!expectedSet.remove(splitPathString)) {
+ Assert.fail("Found extra split: " + splitPathString);
+ }
+ }
+ Assert.assertEquals(
+ "Not all expectedPaths matched: " + expectedSet.toString(), 0,
+ expectedSet.size());
+ }
+
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.test.impl.disable.cache", "true");