You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/09/30 00:38:02 UTC

svn commit: r700293 - in /hadoop/core/trunk: CHANGES.txt src/core/org/apache/hadoop/util/LineReader.java src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

Author: cdouglas
Date: Mon Sep 29 15:38:02 2008
New Revision: 700293

URL: http://svn.apache.org/viewvc?rev=700293&view=rev
Log:
HADOOP-4226. Refactor and document LineReader to make it more readily
understandable. Contributed by Yuri Pradkin.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700293&r1=700292&r2=700293&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 29 15:38:02 2008
@@ -26,6 +26,9 @@
     HADOOP-4262. Generate better error message when client exception has null
     message. (stevel via omalley)
 
+    HADOOP-4226. Refactor and document LineReader to make it more readily
+    understandable. (Yuri Pradkin via cdouglas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java?rev=700293&r1=700292&r2=700293&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java Mon Sep 29 15:38:02 2008
@@ -37,6 +37,9 @@
   // the current position in the buffer
   private int bufferPosn = 0;
 
+  private static final byte CR = '\r';
+  private static final byte LF = '\n';
+
   /**
    * Create a line reader that reads from the given stream using the
    * default buffer-size (64k).
@@ -73,17 +76,6 @@
   }
 
   /**
-   * Fill the buffer with more data.
-   * @return was there more data?
-   * @throws IOException
-   */
-  boolean backfill() throws IOException {
-    bufferPosn = 0;
-    bufferLength = in.read(buffer);
-    return bufferLength > 0;
-  }
-  
-  /**
    * Close the underlying stream.
    * @throws IOException
    */
@@ -92,68 +84,86 @@
   }
   
   /**
-   * Read from the InputStream into the given Text.
-   * @param str the object to store the given line
-   * @param maxLineLength the maximum number of bytes to store into str.
-   * @param maxBytesToConsume the maximum number of bytes to consume in this call.
-   * @return the number of bytes read including the newline
+   * Read one line from the InputStream into the given Text.  A line
+   * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+   * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
+   * line.
+   *
+   * @param str the object to store the given line (without newline)
+   * @param maxLineLength the maximum number of bytes to store into str;
+   *  the rest of the line is silently discarded.
+   * @param maxBytesToConsume the maximum number of bytes to consume
+   *  in this call.  This is only a hint, because if the line cross
+   *  this threshold, we allow it to happen.  It can overshoot
+   *  potentially by as much as one buffer length.
+   *
+   * @return the number of bytes read including the (longest) newline
+   * found.
+   *
    * @throws IOException if the underlying stream throws
    */
   public int readLine(Text str, int maxLineLength,
                       int maxBytesToConsume) throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
     str.clear();
-    boolean hadFinalNewline = false;
-    boolean hadFinalReturn = false;
-    boolean hitEndOfFile = false;
-    int startPosn = bufferPosn;
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
     long bytesConsumed = 0;
-    outerLoop: while (true) {
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
       if (bufferPosn >= bufferLength) {
-        if (!backfill()) {
-          hitEndOfFile = true;
+        startPosn = bufferPosn = 0;
+        if (prevCharCR)
+          ++bytesConsumed; //account for CR from previous read
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0)
+          break; // EOF
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
           break;
         }
-      }
-      startPosn = bufferPosn;
-      for(; bufferPosn < bufferLength; ++bufferPosn) {
-        switch (buffer[bufferPosn]) {
-        case '\n':
-          hadFinalNewline = true;
-          bufferPosn += 1;
-          break outerLoop;
-        case '\r':
-          if (hadFinalReturn) {
-            // leave this \r in the stream, so we'll get it next time
-            break outerLoop;
-          }
-          hadFinalReturn = true;
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
           break;
-        default:
-          if (hadFinalReturn) {
-            break outerLoop;
-          }
-        }        
-      }
-      bytesConsumed += bufferPosn - startPosn;
-      int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
-      length = (int)Math.min(length, maxLineLength - str.getLength());
-      if (length >= 0) {
-        str.append(buffer, startPosn, length);
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
       }
-      if (bytesConsumed >= maxBytesToConsume) {
-        return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0)
+        --readLength; //CR at the end of the buffer
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
       }
-    }
-    int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
-    if (!hitEndOfFile) {
-      bytesConsumed += bufferPosn - startPosn;
-      int length = bufferPosn - startPosn - newlineLength;
-      length = (int)Math.min(length, maxLineLength - str.getLength());
-      if (length > 0) {
-        str.append(buffer, startPosn, length);
+      if (appendLength > 0) {
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
       }
-    }
-    return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long)Integer.MAX_VALUE)
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);    
+    return (int)bytesConsumed;
   }
 
   /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=700293&r1=700292&r2=700293&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Mon Sep 29 15:38:02 2008
@@ -132,6 +132,11 @@
                                              (str.getBytes("UTF-8")), 
                                            defaultConf);
   }
+  private static LineReader makeStream(String str, int bufsz) throws IOException {
+    return new LineReader(new ByteArrayInputStream
+                                             (str.getBytes("UTF-8")), 
+                                           bufsz);
+  }
   
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
@@ -144,40 +149,74 @@
     assertEquals("split on fake newline", "abc\u200axyz", line.toString());
   }
 
+  /**
+   * Test readLine for various kinds of line termination sequneces.
+   * Varies buffer size to stress test.  Also check that returned
+   * value matches the string length.
+   *
+   * @throws Exception
+   */
   public void testNewLines() throws Exception {
-    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
+    final int STRLENBYTES = STR.getBytes().length;
     Text out = new Text();
-    in.readLine(out);
-    assertEquals("line1 length", 1, out.getLength());
-    in.readLine(out);
-    assertEquals("line2 length", 2, out.getLength());
-    in.readLine(out);
-    assertEquals("line3 length", 0, out.getLength());
-    in.readLine(out);
-    assertEquals("line4 length", 3, out.getLength());
-    in.readLine(out);
-    assertEquals("line5 length", 4, out.getLength());
-    in.readLine(out);
-    assertEquals("line5 length", 5, out.getLength());
-    assertEquals("end of file", 0, in.readLine(out));
+    for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
+      LineReader in = makeStream(STR, bufsz);
+      int c = 0;
+      c += in.readLine(out); //"a"\n
+      assertEquals("line1 length, bufsz:"+bufsz, 1, out.getLength());
+      c += in.readLine(out); //"bb"\n
+      assertEquals("line2 length, bufsz:"+bufsz, 2, out.getLength());
+      c += in.readLine(out); //""\n
+      assertEquals("line3 length, bufsz:"+bufsz, 0, out.getLength());
+      c += in.readLine(out); //"ccc"\r
+      assertEquals("line4 length, bufsz:"+bufsz, 3, out.getLength());
+      c += in.readLine(out); //dddd\r
+      assertEquals("line5 length, bufsz:"+bufsz, 4, out.getLength());
+      c += in.readLine(out); //""\r
+      assertEquals("line6 length, bufsz:"+bufsz, 0, out.getLength());
+      c += in.readLine(out); //""\r\n
+      assertEquals("line7 length, bufsz:"+bufsz, 0, out.getLength());
+      c += in.readLine(out); //""\r\n
+      assertEquals("line8 length, bufsz:"+bufsz, 0, out.getLength());
+      c += in.readLine(out); //"eeeee"EOF
+      assertEquals("line9 length, bufsz:"+bufsz, 5, out.getLength());
+      assertEquals("end of file, bufsz: "+bufsz, 0, in.readLine(out));
+      assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
+    }
   }
-  
+
+  /**
+   * Test readLine for correct interpretation of maxLineLength
+   * (returned string should be clipped at maxLineLength, and the
+   * remaining bytes on the same line should be thrown out).
+   * Also check that returned value matches the string length.
+   * Varies buffer size to stress test.
+   *
+   * @throws Exception
+   */
   public void testMaxLineLength() throws Exception {
-    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
+    final int STRLENBYTES = STR.getBytes().length;
     Text out = new Text();
-    in.readLine(out, 1);
-    assertEquals("line1 length", 1, out.getLength());
-    in.readLine(out, 1);
-    assertEquals("line2 length", 1, out.getLength());
-    in.readLine(out, 1);
-    assertEquals("line3 length", 0, out.getLength());
-    in.readLine(out, 3);
-    assertEquals("line4 length", 3, out.getLength());
-    in.readLine(out, 10);
-    assertEquals("line5 length", 4, out.getLength());
-    in.readLine(out, 8);
-    assertEquals("line5 length", 5, out.getLength());
-    assertEquals("end of file", 0, in.readLine(out));
+    for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
+      LineReader in = makeStream(STR, bufsz);
+      int c = 0;
+      c += in.readLine(out, 1);
+      assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
+      c += in.readLine(out, 1);
+      assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
+      c += in.readLine(out, 1);
+      assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
+      c += in.readLine(out, 3);
+      assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
+      c += in.readLine(out, 10);
+      assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
+      c += in.readLine(out, 8);
+      assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
+      assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
+      assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
+    }
   }
 
   private static void writeFile(FileSystem fs, Path name,