You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/07 20:35:27 UTC
[02/35] hadoop git commit: MAPREDUCE-7086. Add config to allow
FileInputFormat to ignore directories when recursive=false. Contributed by
Sergey Shelukhin
MAPREDUCE-7086. Add config to allow FileInputFormat to ignore directories when recursive=false. Contributed by Sergey Shelukhin
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68c6ec71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68c6ec71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68c6ec71
Branch: refs/heads/HDDS-4
Commit: 68c6ec719da8e79ada31c8f3a82124f90b9a71fd
Parents: 24eeea8
Author: Jason Lowe <jl...@apache.org>
Authored: Tue May 1 16:19:53 2018 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue May 1 16:19:53 2018 -0500
----------------------------------------------------------------------
.../apache/hadoop/mapred/FileInputFormat.java | 25 ++++++++++++++------
.../mapreduce/lib/input/FileInputFormat.java | 8 +++++++
.../hadoop/mapred/TestFileInputFormat.java | 17 ++++++++++++-
.../lib/input/TestFileInputFormat.java | 12 ++++++++++
4 files changed, 54 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
index b0ec979..fe43991 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
@@ -78,10 +78,13 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public static final String NUM_INPUT_FILES =
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
-
+
public static final String INPUT_DIR_RECURSIVE =
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
+ public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS =
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS;
+
private static final double SPLIT_SLOP = 1.1; // 10% slop
@@ -319,16 +322,24 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
- FileStatus[] files = listStatus(job);
-
+ FileStatus[] stats = listStatus(job);
+
// Save the number of input files for metrics/loadgen
- job.setLong(NUM_INPUT_FILES, files.length);
+ job.setLong(NUM_INPUT_FILES, stats.length);
long totalSize = 0; // compute total size
- for (FileStatus file: files) { // check we have valid files
+ boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
+ && job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
+
+ List<FileStatus> files = new ArrayList<>(stats.length);
+ for (FileStatus file: stats) { // check we have valid files
if (file.isDirectory()) {
- throw new IOException("Not a file: "+ file.getPath());
+ if (!ignoreDirs) {
+ throw new IOException("Not a file: "+ file.getPath());
+ }
+ } else {
+ files.add(file);
+ totalSize += file.getLen();
}
- totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
index 9868e8e..e2d8e6f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
@@ -76,6 +76,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
+ public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS =
+ "mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs";
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
@@ -392,7 +394,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
+
+ boolean ignoreDirs = !getInputDirRecursive(job)
+ && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
for (FileStatus file: files) {
+ if (ignoreDirs && file.isDirectory()) {
+ continue;
+ }
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
index d322011..879cd3d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
@@ -102,7 +102,22 @@ public class TestFileInputFormat {
1, mockFs.numListLocatedStatusCalls);
FileSystem.closeAll();
}
-
+
+ @Test
+ public void testIgnoreDirs() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true);
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1");
+ MockFileSystem mockFs = (MockFileSystem) new Path("test:///").getFileSystem(conf);
+ JobConf job = new JobConf(conf);
+ TextInputFormat fileInputFormat = new TextInputFormat();
+ fileInputFormat.configure(job);
+ InputSplit[] splits = fileInputFormat.getSplits(job, 1);
+ Assert.assertEquals("Input splits are not correct", 1, splits.length);
+ FileSystem.closeAll();
+ }
+
@Test
public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
index 4c847fa..3897a9b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
@@ -124,6 +124,18 @@ public class TestFileInputFormat {
}
@Test
+ public void testNumInputFilesIgnoreDirs() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+ conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+ List<InputSplit> splits = fileInputFormat.getSplits(job);
+ Assert.assertEquals("Input splits are not correct", 1, splits.size());
+ verifySplits(Lists.newArrayList("test:/a1/file1"), splits);
+ }
+
+ @Test
public void testListLocatedStatus() throws Exception {
Configuration conf = getConfiguration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org