You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/31 23:26:42 UTC

svn commit: r502021 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cutting
Date: Wed Jan 31 14:26:41 2007
New Revision: 502021

URL: http://svn.apache.org/viewvc?view=rev&rev=502021
Log:
HADOOP-788.  Change contrib/streaming to subclass TextInputFormat.  Contributed by Sanjay.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 31 14:26:41 2007
@@ -114,6 +114,10 @@
 35. HADOOP-881.  Fix JobTracker web interface to display the correct
     number of task failures.  (Sanjay Dahiya via cutting)
 
+36. HADOOP-788.  Change contrib/streaming to subclass TextInputFormat,
+    permitting it to take advantage of native compression facilities.
+    (Sanjay Dahiya via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java Wed Jan 31 14:26:41 2007
@@ -85,7 +85,7 @@
    full file at a time...    )
    */
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return ((StreamInputFormat) primary_).getFullFileSplits(job);
+    return ((StreamInputFormat) primary_).getSplits(job, numSplits);
   }
 
   /**

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Wed Jan 31 14:26:41 2007
@@ -100,16 +100,6 @@
 
   /// StreamBaseRecordReader API
 
-  public void init() throws IOException {
-    LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
-        + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
-        + in_.getPos());
-    if (start_ > in_.getPos()) {
-      in_.seek(start_);
-    }
-    seekNextRecordBoundary();
-  }
-
   /** Implementation should seek forward in_ to the first byte of the next record.
    *  The initial byte offset in the stream is arbitrary.
    */

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Wed Jan 31 14:26:41 2007
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
 
 import org.apache.hadoop.mapred.*;
 
@@ -34,7 +35,7 @@
  * selects a RecordReader based on a JobConf property.
  * @author Michel Tourn
  */
-public class StreamInputFormat extends InputFormatBase {
+public class StreamInputFormat extends TextInputFormat {
 
   // an InputFormat should be public with the synthetic public default constructor
   // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
@@ -54,7 +55,6 @@
       return super.getSplits(job, numSplits);
     }
   }
-
   /** For the compressed-files case: override InputFormatBase to produce one split. */
   FileSplit[] getFullFileSplits(JobConf job) throws IOException {
     Path[] files = listPaths(job);
@@ -79,9 +79,8 @@
     final long start = split.getStart();
     final long end = start + split.getLength();
 
-    String splitName = split.getPath() + ":" + start + "-" + end;
-    final FSDataInputStream in = fs.open(split.getPath());
-
+    FSDataInputStream in = fs.open(split.getPath());
+    
     // will open the file and seek to the start of the split
     // Factory dispatch based on available params..
     Class readerClass;
@@ -103,15 +102,13 @@
       throw new RuntimeException(nsm);
     }
 
-    StreamBaseRecordReader reader;
+    RecordReader reader;
     try {
-      reader = (StreamBaseRecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
+      reader = (RecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
           fs });
     } catch (Exception nsm) {
       throw new RuntimeException(nsm);
     }
-
-    reader.init();
 
     if (reader instanceof StreamSequenceRecordReader) {
       // override k/v class types with types stored in SequenceFile

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Wed Jan 31 14:26:41 2007
@@ -19,59 +19,65 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.nio.charset.MalformedInputException;
-import java.util.Arrays;
-import java.util.zip.GZIPInputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.LineRecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Similar to org.apache.hadoop.mapred.TextRecordReader, 
  * but delimits key and value with a TAB.
  * @author Michel Tourn
  */
-public class StreamLineRecordReader extends StreamBaseRecordReader {
+public class StreamLineRecordReader extends LineRecordReader {
+  
+  private String splitName;
+  private Reporter reporter;
+  private FileSplit split;  
+  private int numRec = 0;
+  private int nextStatusRec = 1;
+  private int statusMaxRecordChars;
+  protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class);
+  // base class uses LongWritable as key, use this. 
+  private WritableComparable dummyKey = super.createKey(); 
+  private Text innerValue = (Text)super.createValue(); 
 
-  public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+  public StreamLineRecordReader(FSDataInputStream in, FileSplit split, 
+      Reporter reporter,
       JobConf job, FileSystem fs) throws IOException {
-    super(in, split, reporter, job, fs);
-    gzipped_ = StreamInputFormat.isGzippedInput(job);
-    if (gzipped_) {
-      din_ = new BufferedInputStream( (new GZIPInputStream(in_) ) );
-    } else {
-      din_ = in_;
-    }
+    super(createStream(in, job), split.getStart(), 
+        split.getStart() + split.getLength());
+    this.split = split ; 
+    this.reporter = reporter ; 
   }
-
-  public void seekNextRecordBoundary() throws IOException {
-    if (gzipped_) {
-      // no skipping: use din_ as-is 
-      // assumes splitter created only one split per file
-      return;
-    } else {
-      int bytesSkipped = 0;
-      if (start_ != 0) {
-        in_.seek(start_ - 1);
-        // scan to the next newline in the file
-        while (in_.getPos() < end_) {
-          char c = (char) in_.read();
-          bytesSkipped++;
-          if (c == '\r' || c == '\n') {
-            break;
-          }
-        }
-      }
-
-      //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
-    }
+  
+  private static InputStream createStream(FSDataInputStream in, JobConf job) 
+    throws IOException{
+    InputStream finalStream = in ;
+    boolean gzipped = StreamInputFormat.isGzippedInput(job);
+    if ( gzipped ) {
+      GzipCodec codec = new GzipCodec();
+      codec.setConf(job);
+      finalStream = codec.createInputStream(in);
+    } 
+    return finalStream; 
+  }
+  
+  public WritableComparable createKey() {
+    return new Text();
+  }  
+  
+  public Writable createValue() {
+    return new Text();
   }
 
   public synchronized boolean next(Writable key, Writable value) throws IOException {
@@ -86,14 +92,12 @@
 
     Text tKey = (Text) key;
     Text tValue = (Text) value;
-    byte[] line;
-
-    if ( !gzipped_  ) {
-      long pos = in_.getPos();
-      if (pos >= end_) return false;
+    byte[] line = null ; 
+    if( super.next(dummyKey, innerValue) ){
+      line = innerValue.getBytes(); 
+    }else{
+      return false;
     }
-    
-    line = UTF8ByteArrayUtils.readLine((InputStream) din_);
     if (line == null) return false;
     int tab = UTF8ByteArrayUtils.findTab(line);
     if (tab == -1) {
@@ -105,7 +109,35 @@
     numRecStats(line, 0, line.length);
     return true;
   }
+  
+  private void numRecStats(byte[] record, int start, int len) throws IOException {
+    numRec++;
+    if (numRec == nextStatusRec) {
+      String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8");
+      nextStatusRec += 100;//*= 10;
+      String status = getStatus(recordStr);
+      LOG.info(status);
+      reporter.setStatus(status);
+    }
+  }
 
-  boolean gzipped_;
-  InputStream din_; // GZIP or plain  
+  private String getStatus(CharSequence record) {
+    long pos = -1;
+    try {
+      pos = getPos();
+    } catch (IOException io) {
+    }
+    String recStr;
+    if (record.length() > statusMaxRecordChars) {
+      recStr = record.subSequence(0, statusMaxRecordChars) + "...";
+    } else {
+      recStr = record.toString();
+    }
+    String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+"
+        + split.getLength();
+    String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit
+        + " Processing record=" + recStr;
+    status += " " + splitName;
+    return status;
+  }
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Wed Jan 31 14:26:41 2007
@@ -62,8 +62,19 @@
       beginPat_ = makePatternCDataOrMark(beginMark_);
       endPat_ = makePatternCDataOrMark(endMark_);
     }
+    init();
   }
 
+  public void init() throws IOException {
+    LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
+        + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
+        + in_.getPos());
+    if (start_ > in_.getPos()) {
+      in_.seek(start_);
+    }
+    seekNextRecordBoundary();
+  }
+  
   int numNext = 0;
 
   public synchronized boolean next(Writable key, Writable value) throws IOException {

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=auto&rev=502021
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Wed Jan 31 14:26:41 2007
@@ -0,0 +1,127 @@
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Treats keys as offset in file and value as line. 
+ * @author sanjaydahiya
+ *
+ */
+public class LineRecordReader implements RecordReader {
+  private long start; 
+  private long pos;
+  private long end;
+  private BufferedInputStream in;
+  private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+  /**
+   * Provide a bridge to get the bytes from the ByteArrayOutputStream
+   * without creating a new byte array.
+   */
+  private static class TextStuffer extends OutputStream {
+    public Text target;
+    public void write(int b) {
+      throw new UnsupportedOperationException("write(byte) not supported");
+    }
+    public void write(byte[] data, int offset, int len) throws IOException {
+      target.set(data, offset, len);
+    }      
+  }
+  private TextStuffer bridge = new TextStuffer();
+
+  public LineRecordReader(InputStream in, long offset, long endOffset) 
+    throws IOException{
+    this.in = new BufferedInputStream(in);
+    this.start = offset;
+    this.pos = offset;
+    this.end = endOffset;    
+//    readLine(in, null); 
+  }
+  
+  public WritableComparable createKey() {
+    return new LongWritable();
+  }
+  
+  public Writable createValue() {
+    return new Text();
+  }
+  
+  /** Read a line. */
+  public synchronized boolean next(Writable key, Writable value)
+    throws IOException {
+    if (pos >= end)
+      return false;
+
+    ((LongWritable)key).set(pos);           // key is position
+    buffer.reset();
+    long bytesRead = readLine(in, buffer);
+    if (bytesRead == 0) {
+      return false;
+    }
+    pos += bytesRead;
+    bridge.target = (Text) value;
+    buffer.writeTo(bridge);
+    return true;
+  }
+
+  public static long readLine(InputStream in, 
+      OutputStream out) throws IOException {
+    long bytes = 0;
+    while (true) {
+      
+      int b = in.read();
+      if (b == -1) {
+        break;
+      }
+      bytes += 1;
+      
+      byte c = (byte)b;
+      if (c == '\n') {
+        break;
+      }
+      
+      if (c == '\r') {
+        in.mark(1);
+        byte nextC = (byte)in.read();
+        if (nextC != '\n') {
+          in.reset();
+        } else {
+          bytes += 1;
+        }
+        break;
+      }
+      
+      if (out != null) {
+        out.write(c);
+      }
+    }
+    return bytes;
+  }
+  
+  /**
+   * Get the progress within the split
+   */
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return (pos - start) / (end - start);
+    }
+  }
+  
+  public  synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  public synchronized void close() throws IOException { 
+    in.close(); 
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Wed Jan 31 14:26:41 2007
@@ -21,7 +21,6 @@
 import java.io.*;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.*;
 
 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
@@ -39,81 +38,6 @@
     return compressionCodecs.getCodec(file) == null;
   }
   
-  protected static class LineRecordReader implements RecordReader {
-    private long start;
-    private long pos;
-    private long end;
-    private BufferedInputStream in;
-    private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
-    /**
-     * Provide a bridge to get the bytes from the ByteArrayOutputStream
-     * without creating a new byte array.
-     */
-    private static class TextStuffer extends OutputStream {
-      public Text target;
-      public void write(int b) {
-        throw new UnsupportedOperationException("write(byte) not supported");
-      }
-      public void write(byte[] data, int offset, int len) throws IOException {
-        target.set(data, offset, len);
-      }      
-    }
-    private TextStuffer bridge = new TextStuffer();
-
-    public LineRecordReader(InputStream in, long offset, long endOffset) {
-      this.in = new BufferedInputStream(in);
-      this.start = offset;
-      this.pos = offset;
-      this.end = endOffset;
-    }
-    
-    public WritableComparable createKey() {
-      return new LongWritable();
-    }
-    
-    public Writable createValue() {
-      return new Text();
-    }
-    
-    /**
-     * Get the progress within the split
-     */
-    public float getProgress() {
-      if (start == end) {
-        return 0.0f;
-      } else {
-        return (pos - start) / (end - start);
-      }
-    }
-    
-    /** Read a line. */
-    public synchronized boolean next(Writable key, Writable value)
-      throws IOException {
-      if (pos >= end)
-        return false;
-
-      ((LongWritable)key).set(pos);           // key is position
-      buffer.reset();
-      long bytesRead = readLine(in, buffer);
-      if (bytesRead == 0) {
-        return false;
-      }
-      pos += bytesRead;
-      bridge.target = (Text) value;
-      buffer.writeTo(bridge);
-      return true;
-    }
-    
-    public  synchronized long getPos() throws IOException {
-      return pos;
-    }
-
-    public synchronized void close() throws IOException { 
-      in.close(); 
-    }  
-
-  }
-  
   public RecordReader getRecordReader(InputSplit genericSplit,
                                       JobConf job, Reporter reporter)
     throws IOException {
@@ -129,52 +53,16 @@
     FileSystem fs = FileSystem.get(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
     InputStream in = fileIn;
-    
     if (codec != null) {
       in = codec.createInputStream(fileIn);
       end = Long.MAX_VALUE;
     } else if (start != 0) {
       fileIn.seek(start-1);
-      readLine(fileIn, null);
+      LineRecordReader.readLine(fileIn, null);
       start = fileIn.getPos();
     }
     
     return new LineRecordReader(in, start, end);
   }
-
-  public static long readLine(InputStream in, 
-                              OutputStream out) throws IOException {
-    long bytes = 0;
-    while (true) {
-
-      int b = in.read();
-      if (b == -1) {
-        break;
-      }
-      bytes += 1;
-      
-      byte c = (byte)b;
-      if (c == '\n') {
-        break;
-      }
-      
-      if (c == '\r') {
-        in.mark(1);
-        byte nextC = (byte)in.read();
-        if (nextC != '\n') {
-          in.reset();
-        } else {
-          bytes += 1;
-        }
-        break;
-      }
-
-      if (out != null) {
-        out.write(c);
-      }
-    }
-    return bytes;
-  }
-
 }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Wed Jan 31 14:26:41 2007
@@ -130,14 +130,14 @@
   public void testUTF8() throws Exception {
     InputStream in = makeStream("abcd\u20acbdcd\u20ac");
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     Text line = new Text();
     line.set(out.toByteArray());
     assertEquals("readLine changed utf8 characters", 
                  "abcd\u20acbdcd\u20ac", line.toString());
     in = makeStream("abc\u200axyz");
     out.reset();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     line.set(out.toByteArray());
     assertEquals("split on fake newline", "abc\u200axyz", line.toString());
   }
@@ -145,24 +145,24 @@
   public void testNewLines() throws Exception {
     InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     assertEquals("line1 length", 1, out.size());
     out.reset();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     assertEquals("line2 length", 2, out.size());
     out.reset();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     assertEquals("line3 length", 0, out.size());
     out.reset();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     assertEquals("line4 length", 3, out.size());
     out.reset();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     assertEquals("line5 length", 4, out.size());
     out.reset();
-    TextInputFormat.readLine(in, out);
+    LineRecordReader.readLine(in, out);
     assertEquals("line5 length", 5, out.size());
-    assertEquals("end of file", 0, TextInputFormat.readLine(in, out));
+    assertEquals("end of file", 0, LineRecordReader.readLine(in, out));
   }
   
   private static void writeFile(FileSystem fs, Path name,