You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/09/30 00:38:02 UTC
svn commit: r700293 - in /hadoop/core/trunk: CHANGES.txt
src/core/org/apache/hadoop/util/LineReader.java
src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Author: cdouglas
Date: Mon Sep 29 15:38:02 2008
New Revision: 700293
URL: http://svn.apache.org/viewvc?rev=700293&view=rev
Log:
HADOOP-4226. Refactor and document LineReader to make it more readily
understandable. Contributed by Yuri Pradkin.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700293&r1=700292&r2=700293&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 29 15:38:02 2008
@@ -26,6 +26,9 @@
HADOOP-4262. Generate better error message when client exception has null
message. (stevel via omalley)
+ HADOOP-4226. Refactor and document LineReader to make it more readily
+ understandable. (Yuri Pradkin via cdouglas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java?rev=700293&r1=700292&r2=700293&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java Mon Sep 29 15:38:02 2008
@@ -37,6 +37,9 @@
// the current position in the buffer
private int bufferPosn = 0;
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
+
/**
* Create a line reader that reads from the given stream using the
* default buffer-size (64k).
@@ -73,17 +76,6 @@
}
/**
- * Fill the buffer with more data.
- * @return was there more data?
- * @throws IOException
- */
- boolean backfill() throws IOException {
- bufferPosn = 0;
- bufferLength = in.read(buffer);
- return bufferLength > 0;
- }
-
- /**
* Close the underlying stream.
* @throws IOException
*/
@@ -92,68 +84,86 @@
}
/**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @param maxBytesToConsume the maximum number of bytes to consume in this call.
- * @return the number of bytes read including the newline
+ * Read one line from the InputStream into the given Text. A line
+ * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
+ * line.
+ *
+ * @param str the object to store the given line (without newline)
+ * @param maxLineLength the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ *
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ *
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
str.clear();
- boolean hadFinalNewline = false;
- boolean hadFinalReturn = false;
- boolean hitEndOfFile = false;
- int startPosn = bufferPosn;
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
- outerLoop: while (true) {
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
- if (!backfill()) {
- hitEndOfFile = true;
+ startPosn = bufferPosn = 0;
+ if (prevCharCR)
+ ++bytesConsumed; //account for CR from previous read
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0)
+ break; // EOF
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
break;
}
- }
- startPosn = bufferPosn;
- for(; bufferPosn < bufferLength; ++bufferPosn) {
- switch (buffer[bufferPosn]) {
- case '\n':
- hadFinalNewline = true;
- bufferPosn += 1;
- break outerLoop;
- case '\r':
- if (hadFinalReturn) {
- // leave this \r in the stream, so we'll get it next time
- break outerLoop;
- }
- hadFinalReturn = true;
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
break;
- default:
- if (hadFinalReturn) {
- break outerLoop;
- }
- }
- }
- bytesConsumed += bufferPosn - startPosn;
- int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
- length = (int)Math.min(length, maxLineLength - str.getLength());
- if (length >= 0) {
- str.append(buffer, startPosn, length);
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
}
- if (bytesConsumed >= maxBytesToConsume) {
- return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0)
+ --readLength; //CR at the end of the buffer
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
}
- }
- int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
- if (!hitEndOfFile) {
- bytesConsumed += bufferPosn - startPosn;
- int length = bufferPosn - startPosn - newlineLength;
- length = (int)Math.min(length, maxLineLength - str.getLength());
- if (length > 0) {
- str.append(buffer, startPosn, length);
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
}
- }
- return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > (long)Integer.MAX_VALUE)
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ return (int)bytesConsumed;
}
/**
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=700293&r1=700292&r2=700293&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Mon Sep 29 15:38:02 2008
@@ -132,6 +132,11 @@
(str.getBytes("UTF-8")),
defaultConf);
}
+ private static LineReader makeStream(String str, int bufsz) throws IOException {
+ return new LineReader(new ByteArrayInputStream
+ (str.getBytes("UTF-8")),
+ bufsz);
+ }
public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac");
@@ -144,40 +149,74 @@
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
}
+ /**
+ * Test readLine for various kinds of line termination sequneces.
+ * Varies buffer size to stress test. Also check that returned
+ * value matches the string length.
+ *
+ * @throws Exception
+ */
public void testNewLines() throws Exception {
- LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+ final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
+ final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
- in.readLine(out);
- assertEquals("line1 length", 1, out.getLength());
- in.readLine(out);
- assertEquals("line2 length", 2, out.getLength());
- in.readLine(out);
- assertEquals("line3 length", 0, out.getLength());
- in.readLine(out);
- assertEquals("line4 length", 3, out.getLength());
- in.readLine(out);
- assertEquals("line5 length", 4, out.getLength());
- in.readLine(out);
- assertEquals("line5 length", 5, out.getLength());
- assertEquals("end of file", 0, in.readLine(out));
+ for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
+ LineReader in = makeStream(STR, bufsz);
+ int c = 0;
+ c += in.readLine(out); //"a"\n
+ assertEquals("line1 length, bufsz:"+bufsz, 1, out.getLength());
+ c += in.readLine(out); //"bb"\n
+ assertEquals("line2 length, bufsz:"+bufsz, 2, out.getLength());
+ c += in.readLine(out); //""\n
+ assertEquals("line3 length, bufsz:"+bufsz, 0, out.getLength());
+ c += in.readLine(out); //"ccc"\r
+ assertEquals("line4 length, bufsz:"+bufsz, 3, out.getLength());
+ c += in.readLine(out); //dddd\r
+ assertEquals("line5 length, bufsz:"+bufsz, 4, out.getLength());
+ c += in.readLine(out); //""\r
+ assertEquals("line6 length, bufsz:"+bufsz, 0, out.getLength());
+ c += in.readLine(out); //""\r\n
+ assertEquals("line7 length, bufsz:"+bufsz, 0, out.getLength());
+ c += in.readLine(out); //""\r\n
+ assertEquals("line8 length, bufsz:"+bufsz, 0, out.getLength());
+ c += in.readLine(out); //"eeeee"EOF
+ assertEquals("line9 length, bufsz:"+bufsz, 5, out.getLength());
+ assertEquals("end of file, bufsz: "+bufsz, 0, in.readLine(out));
+ assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
+ }
}
-
+
+ /**
+ * Test readLine for correct interpretation of maxLineLength
+ * (returned string should be clipped at maxLineLength, and the
+ * remaining bytes on the same line should be thrown out).
+ * Also check that returned value matches the string length.
+ * Varies buffer size to stress test.
+ *
+ * @throws Exception
+ */
public void testMaxLineLength() throws Exception {
- LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+ final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
+ final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
- in.readLine(out, 1);
- assertEquals("line1 length", 1, out.getLength());
- in.readLine(out, 1);
- assertEquals("line2 length", 1, out.getLength());
- in.readLine(out, 1);
- assertEquals("line3 length", 0, out.getLength());
- in.readLine(out, 3);
- assertEquals("line4 length", 3, out.getLength());
- in.readLine(out, 10);
- assertEquals("line5 length", 4, out.getLength());
- in.readLine(out, 8);
- assertEquals("line5 length", 5, out.getLength());
- assertEquals("end of file", 0, in.readLine(out));
+ for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
+ LineReader in = makeStream(STR, bufsz);
+ int c = 0;
+ c += in.readLine(out, 1);
+ assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
+ c += in.readLine(out, 1);
+ assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
+ c += in.readLine(out, 1);
+ assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
+ c += in.readLine(out, 3);
+ assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
+ c += in.readLine(out, 10);
+ assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
+ c += in.readLine(out, 8);
+ assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
+ assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
+ assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
+ }
}
private static void writeFile(FileSystem fs, Path name,