You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/05 21:12:15 UTC
svn commit: r493169 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Fri Jan 5 12:12:14 2007
New Revision: 493169
URL: http://svn.apache.org/viewvc?view=rev&rev=493169
Log:
HADOOP-619. Extend InputFormatBase to accept individual files and glob patterns as MapReduce inputs. Contributed by Sanjay.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 5 12:12:14 2007
@@ -200,6 +200,10 @@
hadoop-${version}-core.jar so that it can be more easily
identified. (Nigel Daley via cutting)
+57. HADOOP-619. Extend InputFormatBase to accept individual files and
+ glob patterns as MapReduce inputs, not just directories. Also
+ change contrib/streaming to use this. (Sanjay Dahia via cutting)
+
Release 0.9.2 - 2006-12-15
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Fri Jan 5 12:12:14 2007
@@ -21,20 +21,13 @@
import java.io.*;
import java.lang.reflect.*;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import org.apache.hadoop.mapred.*;
/** An input format that performs globbing on DFS paths and
@@ -48,10 +41,6 @@
protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
- /** This implementation always returns true. */
- public void validateInput(JobConf job) throws IOException {
- }
-
static boolean isGzippedInput(JobConf job) {
String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
return "gzip".equals(val);
@@ -77,55 +66,6 @@
splits.add(new FileSplit(file, 0, splitSize, job));
}
return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
- }
-
- protected Path[] listPaths(JobConf job) throws IOException {
- Path[] globs = job.getInputPaths();
- ArrayList list = new ArrayList();
- int dsup = globs.length;
- for (int d = 0; d < dsup; d++) {
- String leafName = globs[d].getName();
- LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
- Path[] paths;
- Path dir;
- FileSystem fs = globs[d].getFileSystem(job);
- PathFilter filter = new GlobFilter(fs, leafName);
- dir = new Path(globs[d].getParent().toString());
- if (dir == null) dir = new Path(".");
- paths = fs.listPaths(dir, filter);
- list.addAll(Arrays.asList(paths));
- }
- return (Path[]) list.toArray(new Path[] {});
- }
-
- class GlobFilter implements PathFilter {
-
- public GlobFilter(FileSystem fs, String glob) {
- fs_ = fs;
- pat_ = Pattern.compile(globToRegexp(glob));
- }
-
- String globToRegexp(String glob) {
- String re = glob;
- re = re.replaceAll("\\.", "\\\\.");
- re = re.replaceAll("\\+", "\\\\+");
- re = re.replaceAll("\\*", ".*");
- re = re.replaceAll("\\?", ".");
- LOG.info("globToRegexp: |" + glob + "| -> |" + re + "|");
- return re;
- }
-
- public boolean accept(Path pathname) {
- boolean acc = !fs_.isChecksumFile(pathname);
- if (acc) {
- acc = pat_.matcher(pathname.getName()).matches();
- }
- LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
- return acc;
- }
-
- Pattern pat_;
- FileSystem fs_;
}
public RecordReader getRecordReader(final InputSplit genericSplit,
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri Jan 5 12:12:14 2007
@@ -652,8 +652,7 @@
throws IOException {
Path [] parents = new Path[1];
int level = 0;
-
- String filename = filePattern.toString();
+ String filename = filePattern.toUri().getPath();
if("".equals(filename) || Path.SEPARATOR.equals(filename)) {
parents[0] = filePattern;
return parents;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java Fri Jan 5 12:12:14 2007
@@ -32,6 +32,7 @@
/** The directory separator, a slash. */
public static final String SEPARATOR = "/";
+ public static final char SEPARATOR_CHAR = '/';
static final boolean WINDOWS
= System.getProperty("os.name").startsWith("Windows");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Fri Jan 5 12:12:14 2007
@@ -22,12 +22,14 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
/** A base class for {@link InputFormat}. */
public abstract class InputFormatBase implements InputFormat {
@@ -38,7 +40,12 @@
private static final double SPLIT_SLOP = 1.1; // 10% slop
private long minSplitSize = 1;
-
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept( Path p ){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
protected void setMinSplitSize(long minSplitSize) {
this.minSplitSize = minSplitSize;
}
@@ -63,39 +70,23 @@
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
- * <p>Property <code>mapred.input.subdir</code>, if set, names a subdirectory
- * that is appended to all input dirs specified by job, and if the given fs
- * lists those too, each is added to the returned array of Path.
- *
* @param job the job to list input paths for
* @return array of Path objects
* @throws IOException if zero items.
*/
protected Path[] listPaths(JobConf job)
throws IOException {
- String subdir = job.get("mapred.input.subdir");
Path[] dirs = job.getInputPaths();
if (dirs.length == 0) {
- throw new IOException("No input directories specified in job");
+ throw new IOException("No input paths specified in job");
}
- ArrayList result = new ArrayList();
- for (int i = 0; i < dirs.length; i++) {
- FileSystem fs = dirs[i].getFileSystem(job);
- Path[] dir = fs.listPaths(dirs[i]);
- if (dir != null) {
- for (int j = 0; j < dir.length; j++) {
- Path file = dir[j];
- if (subdir != null) {
- Path[] subFiles = fs.listPaths(new Path(file, subdir));
- if (subFiles != null) {
- for (int k = 0; k < subFiles.length; k++) {
- result.add(fs.makeQualified(subFiles[k]));
- }
- }
- } else {
- result.add(fs.makeQualified(file));
- }
- }
+ List<Path> result = new ArrayList();
+ for (Path p: dirs) {
+ FileSystem fs = p.getFileSystem(job);
+ Path[] matches =
+ fs.listPaths(fs.globPaths(p, hiddenFileFilter),hiddenFileFilter);
+ for (Path match: matches) {
+ result.add(fs.makeQualified(match));
}
}
@@ -104,22 +95,50 @@
public void validateInput(JobConf job) throws IOException {
Path[] inputDirs = job.getInputPaths();
+ if (inputDirs.length == 0) {
+ throw new IOException("No input paths specified in input");
+ }
+
List<IOException> result = new ArrayList();
- for(int i=0; i < inputDirs.length; ++i) {
- FileSystem fs = inputDirs[i].getFileSystem(job);
- if (!fs.exists(inputDirs[i])) {
- result.add(new FileNotFoundException("Input directory " +
- inputDirs[i] +
- " doesn't exist."));
- } else if (!fs.isDirectory(inputDirs[i])) {
- result.add(new InvalidFileTypeException
- ("Invalid input path, expecting directory : " +
- inputDirs[i]));
+ int totalFiles = 0;
+ for (Path p: inputDirs) {
+ FileSystem fs = p.getFileSystem(job);
+ if (fs.exists(p)) {
+ // make sure all paths are files to avoid exception
+ // while generating splits
+ for (Path subPath : fs.listPaths(p, hiddenFileFilter)) {
+ FileSystem subFS = subPath.getFileSystem(job);
+ if (!subFS.isFile(subPath)) {
+ result.add(new IOException(
+ "Input path is not a file : " + subPath));
+ } else {
+ totalFiles++;
+ }
+ }
+ } else {
+ Path [] paths = fs.globPaths(p, hiddenFileFilter);
+ if (paths.length == 0) {
+ result.add(
+ new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ // validate globbed paths
+ for (Path gPath : paths) {
+ FileSystem gPathFS = gPath.getFileSystem(job);
+ if (!gPathFS.exists(gPath)) {
+ result.add(
+ new FileNotFoundException(
+ "Input path doesnt exist : " + gPath));
+ }
+ }
+ totalFiles += paths.length ;
+ }
}
}
if (!result.isEmpty()) {
throw new InvalidInputException(result);
}
+ // send output to client.
+ LOG.info("Total input paths to process : " + totalFiles);
}
/** Splits files returned by {@link #listPaths(JobConf)} when
@@ -146,7 +165,7 @@
Path file = files[i];
FileSystem fs = file.getFileSystem(job);
long length = fs.getLength(file);
- if (isSplitable(fs, file)) {
+ if (isSplitable(fs, file)) {
long blockSize = fs.getBlockSize(file);
long splitSize = computeSplitSize(goalSize, minSize, blockSize);