You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/18 22:58:34 UTC

svn commit: r488438 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/tes...

Author: cutting
Date: Mon Dec 18 13:58:33 2006
New Revision: 488438

URL: http://svn.apache.org/viewvc?view=rev&rev=488438
Log:
HADOOP-451.  Add a Split interface.  Contributed by Owen.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 18 13:58:33 2006
@@ -122,6 +122,12 @@
 34. HADOOP-823.  Fix problem starting datanode when not all configured
     data directories exist.  (Bryan Pendleton via cutting)
 
+35. HADOOP-451.  Add a Split interface.  CAUTION: This incompatibly
+    changes the InputFormat and RecordReader interfaces.  Not only is
+    FileSplit replaced with Split, but a FileSystem parameter is no
+    longer passed in several methods, input validation has changed,
+    etc.  (omalley via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Mon Dec 18 13:58:33 2006
@@ -366,7 +366,7 @@
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" 
-           fork="yes" maxmemory="128m" dir="${basedir}"
+           fork="yes" maxmemory="256m" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java Mon Dec 18 13:58:33 2006
@@ -84,20 +84,20 @@
    (and if there was, this index may need to be created for the first time
    full file at a time...    )
    */
-  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
-    checkReady(fs, job);
-    return ((StreamInputFormat) primary_).getFullFileSplits(fs, job);
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return ((StreamInputFormat) primary_).getFullFileSplits(job);
   }
 
   /**
    */
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+  public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    FileSystem fs = ((FileSplit) split).getPath().getFileSystem(job);
     checkReady(fs, job);
 
     reporter.setStatus(split.toString());
 
     ArrayList readers = new ArrayList();
-    String primary = split.getPath().toString();
+    String primary = ((FileSplit) split).getPath().toString();
     CompoundDirSpec spec = CompoundDirSpec.findInputSpecForPrimary(primary, job);
     if (spec == null) {
       throw new IOException("Did not find -input spec in JobConf for primary:" + primary);
@@ -106,7 +106,7 @@
       InputFormat f = (InputFormat) fmts_.get(i);
       Path path = new Path(spec.getPaths()[i][0]);
       FileSplit fsplit = makeFullFileSplit(path);
-      RecordReader r = f.getRecordReader(fs, fsplit, job, reporter);
+      RecordReader r = f.getRecordReader(fsplit, job, reporter);
       readers.add(r);
     }
 
@@ -115,7 +115,7 @@
 
   private FileSplit makeFullFileSplit(Path path) throws IOException {
     long len = fs_.getLength(path);
-    return new FileSplit(path, 0, len);
+    return new FileSplit(path, 0, len, job_);
   }
 
   /*
@@ -188,6 +188,14 @@
       }
     }
 
+    public float getProgress() throws IOException {
+      if (primaryClosed_) {
+        return 1.0f;
+      } else {
+        return primaryReader_.getProgress();
+      }
+    }
+    
     public void close() throws IOException {
       IOException firstErr = null;
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Mon Dec 18 13:58:33 2006
@@ -189,7 +189,8 @@
     if (split.getStart() == 0) {
       return leaf;
     } else {
-      return new FileSplit(new Path(leaf), split.getStart(), split.getLength()).toString();
+      return new FileSplit(new Path(leaf), split.getStart(), 
+                           split.getLength(), job_).toString();
     }
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Mon Dec 18 13:58:33 2006
@@ -69,13 +69,7 @@
   public abstract boolean next(Writable key, Writable value) throws IOException;
 
   /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
-    int n = inputDirs.length;
-    boolean[] b = new boolean[n];
-    for (int i = 0; i < n; i++) {
-      b[i] = true;
-    }
-    return b;
+  public void validateInput(JobConf job) throws IOException {
   }
 
   /** Returns the current position in the input. */
@@ -88,6 +82,14 @@
     in_.close();
   }
 
+  public float getProgress() throws IOException {
+    if (end_ == start_) {
+      return 1.0f;
+    } else {
+      return (in_.getPos() - start_) / (end_ - start_);
+    }
+  }
+  
   public WritableComparable createKey() {
     return new Text();
   }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Dec 18 13:58:33 2006
@@ -49,12 +49,7 @@
   protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
 
   /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
-    boolean[] b = new boolean[inputDirs.length];
-    for (int i = 0; i < inputDirs.length; ++i) {
-      b[i] = true;
-    }
-    return b;
+  public void validateInput(JobConf job) throws IOException {
   }
 
   static boolean isGzippedInput(JobConf job) {
@@ -62,29 +57,29 @@
     return "gzip".equals(val);
   }
 
-  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
     if (isGzippedInput(job)) {
-      return getFullFileSplits(fs, job);
+      return getFullFileSplits(job);
     } else {
-      return super.getSplits(fs, job, numSplits);
+      return super.getSplits(job, numSplits);
     }
   }
 
   /** For the compressed-files case: override InputFormatBase to produce one split. */
-  FileSplit[] getFullFileSplits(FileSystem fs, JobConf job) throws IOException {
-    Path[] files = listPaths(fs, job);
+  FileSplit[] getFullFileSplits(JobConf job) throws IOException {
+    Path[] files = listPaths(job);
     int numSplits = files.length;
     ArrayList splits = new ArrayList(numSplits);
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
-      long splitSize = fs.getLength(file);
-      splits.add(new FileSplit(file, 0, splitSize));
+      long splitSize = file.getFileSystem(job).getLength(file);
+      splits.add(new FileSplit(file, 0, splitSize, job));
     }
     return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
   }
 
-  protected Path[] listPaths(FileSystem fs, JobConf job) throws IOException {
+  protected Path[] listPaths(JobConf job) throws IOException {
     Path[] globs = job.getInputPaths();
     ArrayList list = new ArrayList();
     int dsup = globs.length;
@@ -93,6 +88,7 @@
       LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
       Path[] paths;
       Path dir;
+      FileSystem fs = globs[d].getFileSystem(job);
       PathFilter filter = new GlobFilter(fs, leafName);
       dir = new Path(globs[d].getParent().toString());
       if (dir == null) dir = new Path(".");
@@ -132,8 +128,11 @@
     FileSystem fs_;
   }
 
-  public RecordReader getRecordReader(FileSystem fs, final FileSplit split, JobConf job,
-      Reporter reporter) throws IOException {
+  public RecordReader getRecordReader(final InputSplit genericSplit, 
+                                      JobConf job,
+                                      Reporter reporter) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    FileSystem fs = split.getPath().getFileSystem(job);
     LOG.info("getRecordReader start.....split=" + split);
     reporter.setStatus(split.toString());
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Mon Dec 18 13:58:33 2006
@@ -418,7 +418,7 @@
     Path p = new Path(path);
     long start = Long.parseLong(job.get("map.input.start"));
     long length = Long.parseLong(job.get("map.input.length"));
-    return new FileSplit(p, start, length);
+    return new FileSplit(p, start, length, job);
   }
 
   static class TaskId {

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java Mon Dec 18 13:58:33 2006
@@ -51,7 +51,8 @@
 
   /**
    */
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+  public RecordReader getRecordReader(InputSplit split, JobConf job, 
+                                      Reporter reporter) throws IOException {
 
     reporter.setStatus(split.toString());
 
@@ -82,6 +83,9 @@
       return new UTF8();
     }
 
+    public float getProgress() {
+      return 1.0f;
+    }
   }
 
   ArrayList/*<InputFormat>*/fmts_;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon Dec 18 13:58:33 2006
@@ -44,7 +44,7 @@
     
     public LocalFileSystem() {}
 
-    /** @deprecated. */
+    /** @deprecated */
     public LocalFileSystem(Configuration conf) throws IOException {
       initialize(NAME, conf);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html Mon Dec 18 13:58:33 2006
@@ -5,8 +5,10 @@
 org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
 
 <p>
-Files are stored in S3 as blocks (represented by {@link Block}), which have an ID and a length.
-Block metadata is stored in S3 as a small record (represented by {@link INode}) using the URL-encoded
+Files are stored in S3 as blocks (represented by 
+{@link org.apache.hadoop.fs.s3.Block}), which have an ID and a length.
+Block metadata is stored in S3 as a small record (represented by 
+{@link org.apache.hadoop.fs.s3.INode}) using the URL-encoded
 path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
 This design makes it easy to seek to any given position in a file by reading the inode data to compute
 which block to access, then using S3's support for 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Mon Dec 18 13:58:33 2006
@@ -23,18 +23,18 @@
 import java.io.DataOutput;
 import java.io.File;                              // deprecated
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /** A section of an input file.  Returned by {@link
- * InputFormat#getSplits(FileSystem, JobConf, int)} and passed to
- * {@link InputFormat#getRecordReader(FileSystem,FileSplit,JobConf,Reporter)}. */
-public class FileSplit implements Writable {
+ * InputFormat#getSplits(JobConf, int)} and passed to
+ * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}. */
+public class FileSplit implements InputSplit {
   private Path file;
   private long start;
   private long length;
+  private JobConf conf;
   
   FileSplit() {}
 
@@ -44,10 +44,11 @@
    * @param start the position of the first byte in the file to process
    * @param length the number of bytes in the file to process
    */
-  public FileSplit(Path file, long start, long length) {
+  public FileSplit(Path file, long start, long length, JobConf conf) {
     this.file = file;
     this.start = start;
     this.length = length;
+    this.conf = conf;
   }
   
   /** @deprecated Call {@link #getPath()} instead. */
@@ -79,5 +80,13 @@
     length = in.readLong();
   }
 
-
+  public String[] getLocations() throws IOException {
+    String[][] hints = file.getFileSystem(conf).
+                            getFileCacheHints(file, start, length);
+    if (hints != null && hints.length > 0) {
+      return hints[0];
+    }
+    return new String[]{};
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Mon Dec 18 13:58:33 2006
@@ -33,13 +33,10 @@
    * Are the input directories valid? This method is used to test the input
    * directories when a job is submitted so that the framework can fail early
    * with a useful error message when the input directory does not exist.
-   * @param fileSys the file system to check for the directories
-   * @param inputDirs the list of input directories
-   * @return is each inputDir valid?
-   * @throws IOException
+   * @param job the job to check
+   * @throws InvalidInputException if the job does not have valid input
    */
-  boolean[] areValidInputDirectories(FileSystem fileSys,
-                                     Path[] inputDirs) throws IOException;
+  void validateInput(JobConf job) throws IOException;
   
   /** Splits a set of input files.  One split is created per map task.
    *
@@ -47,17 +44,16 @@
    * @param numSplits the desired number of splits
    * @return the splits
    */
-  FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
-    throws IOException;
+  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
   /** Construct a {@link RecordReader} for a {@link FileSplit}.
    *
-   * @param split the {@link FileSplit}
+   * @param split the {@link InputSplit}
    * @param job the job that this split belongs to
    * @return a {@link RecordReader}
    */
-  RecordReader getRecordReader(FileSystem ignored, FileSplit split,
-                               JobConf job, Reporter reporter)
-    throws IOException;
+  RecordReader getRecordReader(InputSplit split,
+                               JobConf job, 
+                               Reporter reporter) throws IOException;
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Mon Dec 18 13:58:33 2006
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.*;
 
@@ -52,8 +54,7 @@
     return true;
   }
   
-  public abstract RecordReader getRecordReader(FileSystem fs,
-                                               FileSplit split,
+  public abstract RecordReader getRecordReader(InputSplit split,
                                                JobConf job,
                                                Reporter reporter)
     throws IOException;
@@ -66,15 +67,17 @@
    * that is appended to all input dirs specified by job, and if the given fs
    * lists those too, each is added to the returned array of Path.
    *
-   * @param fs
-   * @param job
-   * @return array of Path objects, never zero length.
+   * @param job the job to list input paths for
+   * @return array of Path objects
    * @throws IOException if zero items.
    */
-  protected Path[] listPaths(FileSystem ignored, JobConf job)
+  protected Path[] listPaths(JobConf job)
     throws IOException {
-    Path[] dirs = job.getInputPaths();
     String subdir = job.get("mapred.input.subdir");
+    Path[] dirs = job.getInputPaths();
+    if (dirs.length == 0) {
+      throw new IOException("No input directories specified in job");
+    }
     ArrayList result = new ArrayList();
     for (int i = 0; i < dirs.length; i++) {
       FileSystem fs = dirs[i].getFileSystem(job);
@@ -96,30 +99,34 @@
       }
     }
 
-    if (result.size() == 0) {
-      throw new IOException("No input directories specified in: "+job);
-    }
     return (Path[])result.toArray(new Path[result.size()]);
   }
 
-  // NOTE: should really pass a Configuration here, not a FileSystem
-  public boolean[] areValidInputDirectories(FileSystem fs, Path[] inputDirs)
-    throws IOException {
-    boolean[] result = new boolean[inputDirs.length];
+  public void validateInput(JobConf job) throws IOException {
+    Path[] inputDirs = job.getInputPaths();
+    List<IOException> result = new ArrayList();
     for(int i=0; i < inputDirs.length; ++i) {
-      result[i] =
-        inputDirs[i].getFileSystem(fs.getConf()).isDirectory(inputDirs[i]);
+      FileSystem fs = inputDirs[i].getFileSystem(job);
+      if (!fs.exists(inputDirs[i])) {
+        result.add(new FileNotFoundException("Input directory " + 
+                                             inputDirs[i] + 
+                                             " doesn't exist."));
+      } else if (!fs.isDirectory(inputDirs[i])) {
+        result.add(new InvalidFileTypeException
+                     ("Invalid input path, expecting directory : " + 
+                      inputDirs[i]));
+      }
+    }
+    if (!result.isEmpty()) {
+      throw new InvalidInputException(result);
     }
-    return result;
   }
 
-  /** Splits files returned by {@link #listPaths(FileSystem,JobConf)} when
+  /** Splits files returned by {@link #listPaths(JobConf)} when
    * they're too big.*/ 
-  public FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
+  public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
-
-    Path[] files = listPaths(ignored, job);
-
+    Path[] files = listPaths(job);
     long totalSize = 0;                           // compute total size
     for (int i = 0; i < files.length; i++) {      // check we have valid files
       Path file = files[i];
@@ -145,16 +152,18 @@
 
         long bytesRemaining = length;
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
+          splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
+                                   job));
           bytesRemaining -= splitSize;
         }
         
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+          splits.add(new FileSplit(file, length-bytesRemaining, 
+                                   bytesRemaining, job));
         }
       } else {
         if (length != 0) {
-          splits.add(new FileSplit(file, 0, length));
+          splits.add(new FileSplit(file, 0, length, job));
         }
       }
     }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java?view=auto&rev=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java Mon Dec 18 13:58:33 2006
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The description of the data for a single map task.
+ * @author Owen O'Malley
+ */
+public interface InputSplit extends Writable {
+
+  /**
+   * Get the list of hostnames where the input split is located.
+   * @return A list of prefered hostnames
+   * @throws IOException
+   */
+  String[] getLocations() throws IOException;
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java?view=auto&rev=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java Mon Dec 18 13:58:33 2006
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Iterator;
+
+/**
+ * This class wraps a list of problems with the input, so that the user
+ * can get a list of problems together instead of finding and fixing them one 
+ * by one.
+ * @author Owen O'Malley
+ */
+public class InvalidInputException extends IOException {
+  private List<IOException> problems;
+  
+  /**
+   * Create the exception with the given list.
+   * @param probs the list of problems to report. this list is not copied.
+   */
+  public InvalidInputException(List<IOException> probs) {
+    problems = probs;
+  }
+  
+  /**
+   * Get the complete list of the problems reported.
+   * @return the list of problems, which must not be modified
+   */
+  public List<IOException> getProblems() {
+    return problems;
+  }
+  
+  /**
+   * Get a summary message of the problems found.
+   * @return the concatenated messages from all of the problems.
+   */
+  public String getMessage() {
+    StringBuffer result = new StringBuffer();
+    Iterator<IOException> itr = problems.iterator();
+    while(itr.hasNext()) {
+      result.append(itr.next().getMessage());
+      if (itr.hasNext()) {
+        result.append("\n");
+      }
+    }
+    return result.toString();
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Mon Dec 18 13:58:33 2006
@@ -153,7 +153,8 @@
     if (isMap) {
       FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
                                       conf.getLong("map.input.start", 0),
-                                      conf.getLong("map.input.length", 0));
+                                      conf.getLong("map.input.length", 0),
+                                      conf);
       task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), 
           taskId, partition, split);
     } else {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Dec 18 13:58:33 2006
@@ -313,30 +313,7 @@
         }
 
         // input paths should exist. 
-        boolean[] validDirs = 
-          job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs);
-        for(int i=0; i < validDirs.length; ++i) {
-          if (!validDirs[i]) {
-            String msg = null ; 
-            if( !userFileSys.exists(inputDirs[i]) ){
-              msg = "Input directory " + inputDirs[i] + 
-                         " doesn't exist in " + userFileSys.getName();
-              LOG.error(msg);
-              throw new FileNotFoundException(msg);
-            }else if( !userFileSys.isDirectory(inputDirs[i])){
-              msg = "Invalid input path, expecting directory : " + inputDirs[i] ;
-              LOG.error(msg); 
-              throw new InvalidFileTypeException(msg);  
-            }else{
-              // some other error
-              msg = "Input directory " + inputDirs[i] + 
-                           " in " + userFileSys.getName() + " is invalid.";
-              LOG.error(msg);
-              throw new IOException(msg);
-            }
-          }
-        }
-
+        job.getInputFormat().validateInput(job);
 
         // Check the output specification
         job.getOutputFormat().checkOutputSpecs(fs, job);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Dec 18 13:58:33 2006
@@ -122,7 +122,7 @@
         }
         InputFormat inputFormat = conf.getInputFormat();
 
-        FileSplit[] splits = inputFormat.getSplits(fs, conf, numMapTasks);
+        InputSplit[] splits = inputFormat.getSplits(conf, numMapTasks);
 
         //
         // sort splits by decreasing length, to reduce job's tail
@@ -168,22 +168,16 @@
         // Obtain some tasktracker-cache information for the map task splits.
         //
         for (int i = 0; i < maps.length; i++) {
-            String hints[][] =
-              fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),
-                                   splits[i].getLength());
-
-            if (hints != null) {
-              for (int k = 0; k < hints.length; k++) {
-                for (int j = 0; j < hints[k].length; j++) {
-                  ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]);
-                  if (hostMaps == null) {
-                    hostMaps = new ArrayList();
-                    hostToMaps.put(hints[k][j], hostMaps);
-                  }
-                  hostMaps.add(maps[i]);
-                }
-              }
+          String hints[] = splits[i].getLocations();
+          for (int k = 0; k < hints.length; k++) {
+            ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k]);
+            if (hostMaps == null) {
+              hostMaps = new ArrayList();
+              hostToMaps.put(hints[k], hostMaps);
             }
+            hostMaps.add(maps[i]);
+            
+          }
         }
 
         this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Dec 18 13:58:33 2006
@@ -89,8 +89,8 @@
     public void run() {
       try {
         // split input into minimum number of splits
-        FileSplit[] splits;
-        splits = job.getInputFormat().getSplits(fs, job, 1);
+        InputSplit[] splits;
+        splits = job.getInputFormat().getSplits(job, 1);
         String jobId = profile.getJobId();
         
         // run a map task for each split

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Dec 18 13:58:33 2006
@@ -86,14 +86,14 @@
   
   private MapTaskMetrics myMetrics = null;
 
-  private FileSplit split;
+  private InputSplit split;
   private MapOutputFile mapOutputFile = new MapOutputFile();
   private JobConf conf;
 
   public MapTask() {}
 
   public MapTask(String jobId, String jobFile, String tipId, String taskId, 
-                 int partition, FileSplit split) {
+                 int partition, InputSplit split) {
     super(jobId, jobFile, tipId, taskId, partition);
     this.split = split;
     myMetrics = new MapTaskMetrics(taskId);
@@ -103,18 +103,25 @@
       return true;
   }
 
-  public void localizeConfiguration(JobConf conf) {
+  public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
-    conf.set("map.input.file", split.getPath().toString());
-    conf.setLong("map.input.start", split.getStart());
-    conf.setLong("map.input.length", split.getLength());
+    Path localSplit = new Path(new Path(getJobFile()).getParent(), 
+                               "split.dta");
+    DataOutputStream out = LocalFileSystem.get(conf).create(localSplit);
+    split.write(out);
+    out.close();
+    if (split instanceof FileSplit) {
+      conf.set("map.input.file", ((FileSplit) split).getPath().toString());
+      conf.setLong("map.input.start", ((FileSplit) split).getStart());
+      conf.setLong("map.input.length", ((FileSplit) split).getLength());
+    }
   }
   
   public TaskRunner createRunner(TaskTracker tracker) {
     return new MapTaskRunner(this, tracker, this.conf);
   }
 
-  public FileSplit getSplit() { return split; }
+  public InputSplit getSplit() { return split; }
 
   public void write(DataOutput out) throws IOException {
     super.write(out);
@@ -139,11 +146,9 @@
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
       
     final RecordReader rawIn =                  // open input
-      job.getInputFormat().getRecordReader
-      (FileSystem.get(job), split, job, reporter);
+      job.getInputFormat().getRecordReader(split, job, reporter);
 
     RecordReader in = new RecordReader() {      // wrap in progress reporter
-        private float perByte = 1.0f /(float)split.getLength();
 
         public WritableComparable createKey() {
           return rawIn.createKey();
@@ -156,9 +161,7 @@
         public synchronized boolean next(Writable key, Writable value)
           throws IOException {
 
-          float progress =                        // compute progress
-            (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
-          reportProgress(umbilical, progress);
+          reportProgress(umbilical, getProgress());
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           myMetrics.mapInput(getPos() - beforePos);
@@ -166,6 +169,9 @@
         }
         public long getPos() throws IOException { return rawIn.getPos(); }
         public void close() throws IOException { rawIn.close(); }
+        public float getProgress() throws IOException {
+          return rawIn.getProgress();
+        }
       };
 
     MapRunnable runner =

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java Mon Dec 18 13:58:33 2006
@@ -55,4 +55,10 @@
   /** Close this to future operations.*/ 
   public void close() throws IOException;
 
+  /**
+   * How far has the reader gone through the input.
+   * @return progress from 0.0 to 1.0
+   * @throws IOException
+   */
+  float getProgress() throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Dec 18 13:58:33 2006
@@ -100,7 +100,7 @@
   /**
    * Localize the given JobConf to be specific for this task.
    */
-  public void localizeConfiguration(JobConf conf) {
+  public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
     conf.setNumMapTasks(numMaps);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Mon Dec 18 13:58:33 2006
@@ -53,19 +53,18 @@
     }
     
     /** Create a record reader for the given split
-     * @param fs file system where the file split is stored
      * @param split file split
      * @param job job configuration
      * @param reporter reporter who sends report to task tracker
      * @return RecordReader
      */
-    public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+    public RecordReader getRecordReader(InputSplit split,
             JobConf job, Reporter reporter)
     throws IOException {
         
         reporter.setStatus(split.toString());
         
-        return new FilterRecordReader(job, split);
+        return new FilterRecordReader(job, (FileSplit) split);
     }
 
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java Mon Dec 18 13:58:33 2006
@@ -33,26 +33,26 @@
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);
   }
 
-  protected Path[] listPaths(FileSystem fs, JobConf job)
+  protected Path[] listPaths(JobConf job)
     throws IOException {
 
-    Path[] files = super.listPaths(fs, job);
+    Path[] files = super.listPaths(job);
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
-      if (fs.isDirectory(file)) {                 // it's a MapFile
+      if (file.getFileSystem(job).isDirectory(file)) {     // it's a MapFile
         files[i] = new Path(file, MapFile.DATA_FILE_NAME); // use the data file
       }
     }
     return files;
   }
 
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+  public RecordReader getRecordReader(InputSplit split,
                                       JobConf job, Reporter reporter)
     throws IOException {
 
     reporter.setStatus(split.toString());
 
-    return new SequenceFileRecordReader(job, split);
+    return new SequenceFileRecordReader(job, (FileSplit) split);
   }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Mon Dec 18 13:58:33 2006
@@ -29,6 +29,7 @@
 /** An {@link RecordReader} for {@link SequenceFile}s. */
 public class SequenceFileRecordReader implements RecordReader {
   private SequenceFile.Reader in;
+  private long start;
   private long end;
   private boolean more = true;
   private Configuration conf;
@@ -93,6 +94,18 @@
   protected synchronized void getCurrentValue(Writable value)
       throws IOException {
       in.getCurrentValue(value);
+  }
+  
+  /**
+   * Return the progress within the input split
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return (in.getPosition() - start) / (end - start);
+    }
   }
   
   public synchronized long getPos() throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Dec 18 13:58:33 2006
@@ -119,7 +119,7 @@
   /**
    * Localize the given JobConf to be specific for this task.
    */
-  public void localizeConfiguration(JobConf conf) {
+  public void localizeConfiguration(JobConf conf) throws IOException {
     conf.set("mapred.tip.id", tipId); 
     conf.set("mapred.task.id", taskId);
     conf.setBoolean("mapred.task.is.map",isMapTask());

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Dec 18 13:58:33 2006
@@ -52,7 +52,7 @@
 
     // Defines the TIP
     private String jobFile = null;
-    private FileSplit split = null;
+    private InputSplit split = null;
     private int numMaps;
     private int partition;
     private JobTracker jobtracker;
@@ -88,7 +88,7 @@
     /**
      * Constructor for MapTask
      */
-    public TaskInProgress(String uniqueString, String jobFile, FileSplit split, 
+    public TaskInProgress(String uniqueString, String jobFile, InputSplit split, 
                           JobTracker jobtracker, JobConf conf, 
                           JobInProgress job, int partition) {
         this.jobFile = jobFile;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Mon Dec 18 13:58:33 2006
@@ -40,6 +40,7 @@
   }
   
   protected static class LineRecordReader implements RecordReader {
+    private long start;
     private long pos;
     private long end;
     private BufferedInputStream in;
@@ -61,6 +62,7 @@
 
     public LineRecordReader(InputStream in, long offset, long endOffset) {
       this.in = new BufferedInputStream(in);
+      this.start = offset;
       this.pos = offset;
       this.end = endOffset;
     }
@@ -73,6 +75,17 @@
       return new Text();
     }
     
+    /**
+     * Get the progress within the split
+     */
+    public float getProgress() {
+      if (start == end) {
+        return 0.0f;
+      } else {
+        return (pos - start) / (end - start);
+      }
+    }
+    
     /** Read a line. */
     public synchronized boolean next(Writable key, Writable value)
       throws IOException {
@@ -101,18 +114,19 @@
 
   }
   
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+  public RecordReader getRecordReader(InputSplit genericSplit,
                                       JobConf job, Reporter reporter)
     throws IOException {
 
-    reporter.setStatus(split.toString());
-
+    reporter.setStatus(genericSplit.toString());
+    FileSplit split = (FileSplit) genericSplit;
     long start = split.getStart();
     long end = start + split.getLength();
     final Path file = split.getPath();
     final CompressionCodec codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
+    FileSystem fs = FileSystem.get(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
     InputStream in = fileIn;
     

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java Mon Dec 18 13:58:33 2006
@@ -31,7 +31,7 @@
     return new FileSplit[0];
   }
 
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
-    return new SequenceFileRecordReader(job, split);
+  public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    return new SequenceFileRecordReader(job, (FileSplit) split);
   }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Mon Dec 18 13:58:33 2006
@@ -22,6 +22,8 @@
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +37,8 @@
  * A JUnit test to test Map-Reduce empty jobs Mini-DFS.
  */
 public class TestEmptyJobWithDFS extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
   
   /**
    * Simple method running a MapReduce job with no input data. Used
@@ -58,6 +62,7 @@
       FileSystem fs = FileSystem.getNamed(fileSys, conf);
       fs.delete(outDir);
       if (!fs.mkdirs(inDir)) {
+          LOG.warn("Can't create " + inDir);
           return false;
       }
 
@@ -88,6 +93,7 @@
           }
       }
       // return job result
+      LOG.info("job is complete: " + runningJob.isSuccessful());
       return (runningJob.isSuccessful());
   }
   

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java Mon Dec 18 13:58:33 2006
@@ -80,13 +80,14 @@
         numSplits =
             random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
       }
-      FileSplit[] splits = format.getSplits(fs, job, numSplits);
+      InputSplit[] splits = format.getSplits(job, numSplits);
       
       // check each split
       int count = 0;
+      LOG.info("Generated " + splits.length + " splits.");
       for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
-              format.getRecordReader(fs, splits[j], job, reporter);
+              format.getRecordReader(splits[j], job, reporter);
           try {
               while (reader.next(key, value)) {
                   LOG.info("Accept record "+key.toString());

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Mon Dec 18 13:58:33 2006
@@ -83,14 +83,14 @@
         int numSplits =
           random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
         //LOG.info("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
         //LOG.info("splitting: got =        " + splits.length);
 
         // check each split
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
-            format.getRecordReader(fs, splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, reporter);
           try {
             int count = 0;
             while (reader.next(key, value)) {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Mon Dec 18 13:58:33 2006
@@ -88,16 +88,15 @@
       for (int i = 0; i < 3; i++) {
         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
         LOG.debug("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(localFs, job, numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
         LOG.debug("splitting: got =        " + splits.length);
 
         // check each split
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
-          LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" +
-                   splits[j].getLength());
+          LOG.debug("split["+j+"]= " + splits[j]);
           RecordReader reader =
-            format.getRecordReader(localFs, splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, reporter);
           try {
             int count = 0;
             while (reader.next(key, value)) {
@@ -186,10 +185,10 @@
   private static final Reporter voidReporter = new VoidReporter();
   
   private static List<Text> readSplit(InputFormat format, 
-                                      FileSplit split, 
+                                      InputSplit split, 
                                       JobConf job) throws IOException {
     List<Text> result = new ArrayList<Text>();
-    RecordReader reader = format.getRecordReader(localFs, split, job,
+    RecordReader reader = format.getRecordReader(split, job,
                                                  voidReporter);
     LongWritable key = (LongWritable) reader.createKey();
     Text value = (Text) reader.createValue();
@@ -215,10 +214,10 @@
     job.setInputPath(workDir);
     TextInputFormat format = new TextInputFormat();
     format.configure(job);
-    FileSplit[] splits = format.getSplits(localFs, job, 100);
+    InputSplit[] splits = format.getSplits(job, 100);
     assertEquals("compressed splits == 2", 2, splits.length);
-    if (splits[0].getPath().getName().equals("part2.txt.gz")) {
-      FileSplit tmp = splits[0];
+    FileSplit tmp = (FileSplit) splits[0];
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
       splits[0] = splits[1];
       splits[1] = tmp;
     }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java Mon Dec 18 13:58:33 2006
@@ -26,7 +26,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.commons.logging.*;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputFormatBase;
 import org.apache.hadoop.mapred.JobConf;
@@ -93,14 +93,14 @@
         int numSplits =
           random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
         //LOG.info("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
         //LOG.info("splitting: got =        " + splits.length);
 
         // check each split
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
-            format.getRecordReader(fs, splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, reporter);
           try {
             int count = 0;
             while (reader.next(key, value)) {