You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/05 21:12:15 UTC

svn commit: r493169 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Fri Jan  5 12:12:14 2007
New Revision: 493169

URL: http://svn.apache.org/viewvc?view=rev&rev=493169
Log:
HADOOP-619.  Extend InputFormatBase to accept individual files and glob patterns as MapReduce inputs.  Contributed by Sanjay.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan  5 12:12:14 2007
@@ -200,6 +200,10 @@
     hadoop-${version}-core.jar so that it can be more easily
     identified.  (Nigel Daley via cutting)
 
+57. HADOOP-619.  Extend InputFormatBase to accept individual files and
+    glob patterns as MapReduce inputs, not just directories.  Also
+    change contrib/streaming to use this.  (Sanjay Dahia via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Fri Jan  5 12:12:14 2007
@@ -21,20 +21,13 @@
 import java.io.*;
 import java.lang.reflect.*;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FSDataInputStream;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import org.apache.hadoop.mapred.*;
 
 /** An input format that performs globbing on DFS paths and
@@ -48,10 +41,6 @@
 
   protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
 
-  /** This implementation always returns true. */
-  public void validateInput(JobConf job) throws IOException {
-  }
-
   static boolean isGzippedInput(JobConf job) {
     String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
     return "gzip".equals(val);
@@ -77,55 +66,6 @@
       splits.add(new FileSplit(file, 0, splitSize, job));
     }
     return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
-  }
-
-  protected Path[] listPaths(JobConf job) throws IOException {
-    Path[] globs = job.getInputPaths();
-    ArrayList list = new ArrayList();
-    int dsup = globs.length;
-    for (int d = 0; d < dsup; d++) {
-      String leafName = globs[d].getName();
-      LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
-      Path[] paths;
-      Path dir;
-      FileSystem fs = globs[d].getFileSystem(job);
-      PathFilter filter = new GlobFilter(fs, leafName);
-      dir = new Path(globs[d].getParent().toString());
-      if (dir == null) dir = new Path(".");
-      paths = fs.listPaths(dir, filter);
-      list.addAll(Arrays.asList(paths));
-    }
-    return (Path[]) list.toArray(new Path[] {});
-  }
-
-  class GlobFilter implements PathFilter {
-
-    public GlobFilter(FileSystem fs, String glob) {
-      fs_ = fs;
-      pat_ = Pattern.compile(globToRegexp(glob));
-    }
-
-    String globToRegexp(String glob) {
-      String re = glob;
-      re = re.replaceAll("\\.", "\\\\.");
-      re = re.replaceAll("\\+", "\\\\+");
-      re = re.replaceAll("\\*", ".*");
-      re = re.replaceAll("\\?", ".");
-      LOG.info("globToRegexp: |" + glob + "|  ->  |" + re + "|");
-      return re;
-    }
-
-    public boolean accept(Path pathname) {
-      boolean acc = !fs_.isChecksumFile(pathname);
-      if (acc) {
-        acc = pat_.matcher(pathname.getName()).matches();
-      }
-      LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
-      return acc;
-    }
-
-    Pattern pat_;
-    FileSystem fs_;
   }
 
   public RecordReader getRecordReader(final InputSplit genericSplit, 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri Jan  5 12:12:14 2007
@@ -652,8 +652,7 @@
         throws IOException {
       Path [] parents = new Path[1];
       int level = 0;
-      
-      String filename = filePattern.toString();
+      String filename = filePattern.toUri().getPath();
       if("".equals(filename) || Path.SEPARATOR.equals(filename)) {
         parents[0] = filePattern;
         return parents;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java Fri Jan  5 12:12:14 2007
@@ -32,6 +32,7 @@
 
   /** The directory separator, a slash. */
   public static final String SEPARATOR = "/";
+  public static final char SEPARATOR_CHAR = '/';
   
   static final boolean WINDOWS
     = System.getProperty("os.name").startsWith("Windows");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=493169&r1=493168&r2=493169
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Fri Jan  5 12:12:14 2007
@@ -22,12 +22,14 @@
 import java.io.IOException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 
 /** A base class for {@link InputFormat}. */
 public abstract class InputFormatBase implements InputFormat {
@@ -38,7 +40,12 @@
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
   private long minSplitSize = 1;
-
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+    public boolean accept( Path p ){
+      String name = p.getName(); 
+      return !name.startsWith("_") && !name.startsWith("."); 
+    }
+  }; 
   protected void setMinSplitSize(long minSplitSize) {
     this.minSplitSize = minSplitSize;
   }
@@ -63,39 +70,23 @@
    * Subclasses may override to, e.g., select only files matching a regular
    * expression. 
    * 
-   * <p>Property <code>mapred.input.subdir</code>, if set, names a subdirectory
-   * that is appended to all input dirs specified by job, and if the given fs
-   * lists those too, each is added to the returned array of Path.
-   *
    * @param job the job to list input paths for
    * @return array of Path objects
    * @throws IOException if zero items.
    */
   protected Path[] listPaths(JobConf job)
     throws IOException {
-    String subdir = job.get("mapred.input.subdir");
     Path[] dirs = job.getInputPaths();
     if (dirs.length == 0) {
-      throw new IOException("No input directories specified in job");
+      throw new IOException("No input paths specified in job");
     }
-    ArrayList result = new ArrayList();
-    for (int i = 0; i < dirs.length; i++) {
-      FileSystem fs = dirs[i].getFileSystem(job);
-      Path[] dir = fs.listPaths(dirs[i]);
-      if (dir != null) {
-        for (int j = 0; j < dir.length; j++) {
-          Path file = dir[j];
-          if (subdir != null) {
-            Path[] subFiles = fs.listPaths(new Path(file, subdir));
-            if (subFiles != null) {
-              for (int k = 0; k < subFiles.length; k++) {
-                result.add(fs.makeQualified(subFiles[k]));
-              }
-            }
-          } else {
-            result.add(fs.makeQualified(file));
-          }
-        }
+    List<Path> result = new ArrayList(); 
+    for (Path p: dirs) {
+      FileSystem fs = p.getFileSystem(job); 
+      Path[] matches =
+        fs.listPaths(fs.globPaths(p, hiddenFileFilter),hiddenFileFilter);
+      for (Path match: matches) {
+        result.add(fs.makeQualified(match));
       }
     }
 
@@ -104,22 +95,50 @@
 
   public void validateInput(JobConf job) throws IOException {
     Path[] inputDirs = job.getInputPaths();
+    if (inputDirs.length == 0) {
+      throw new IOException("No input paths specified in input"); 
+    }
+    
     List<IOException> result = new ArrayList();
-    for(int i=0; i < inputDirs.length; ++i) {
-      FileSystem fs = inputDirs[i].getFileSystem(job);
-      if (!fs.exists(inputDirs[i])) {
-        result.add(new FileNotFoundException("Input directory " + 
-                                             inputDirs[i] + 
-                                             " doesn't exist."));
-      } else if (!fs.isDirectory(inputDirs[i])) {
-        result.add(new InvalidFileTypeException
-                     ("Invalid input path, expecting directory : " + 
-                      inputDirs[i]));
+    int totalFiles = 0; 
+    for (Path p: inputDirs) {
+      FileSystem fs = p.getFileSystem(job);
+      if (fs.exists(p)) {
+        // make sure all paths are files to avoid exception
+        // while generating splits
+        for (Path subPath : fs.listPaths(p, hiddenFileFilter)) {
+          FileSystem subFS = subPath.getFileSystem(job); 
+          if (!subFS.isFile(subPath)) {
+            result.add(new IOException(
+                "Input path is not a file : " + subPath)); 
+          } else {
+            totalFiles++; 
+          }
+        }
+      } else {
+        Path [] paths = fs.globPaths(p, hiddenFileFilter); 
+        if (paths.length == 0) {
+          result.add(
+            new IOException("Input Pattern " + p + " matches 0 files")); 
+        } else {
+          // validate globbed paths 
+          for (Path gPath : paths) {
+            FileSystem gPathFS = gPath.getFileSystem(job); 
+            if (!gPathFS.exists(gPath)) {
+              result.add(
+                new FileNotFoundException(
+                    "Input path doesnt exist : " + gPath)); 
+            }
+          }
+          totalFiles += paths.length ; 
+        }
       }
     }
     if (!result.isEmpty()) {
       throw new InvalidInputException(result);
     }
+    // send output to client. 
+    LOG.info("Total input paths to process : " + totalFiles); 
   }
 
   /** Splits files returned by {@link #listPaths(JobConf)} when
@@ -146,7 +165,7 @@
       Path file = files[i];
       FileSystem fs = file.getFileSystem(job);
       long length = fs.getLength(file);
-      if (isSplitable(fs, file)) {
+      if (isSplitable(fs, file)) { 
         long blockSize = fs.getBlockSize(file);
         long splitSize = computeSplitSize(goalSize, minSize, blockSize);