You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2012/08/23 18:58:36 UTC
svn commit: r1376592 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt
src/main/java/org/apache/hadoop/util/LineReader.java
src/test/java/org/apache/hadoop/util/TestLineReader.java
Author: bobby
Date: Thu Aug 23 16:58:36 2012
New Revision: 1376592
URL: http://svn.apache.org/viewvc?rev=1376592&view=rev
Log:
HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via bobby)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1376592&r1=1376591&r2=1376592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Aug 23 16:58:36 2012
@@ -832,6 +832,9 @@ Release 2.0.0-alpha - 05-23-2012
HADOOP-7868. Hadoop native fails to compile when default linker
option is -Wl,--as-needed. (Trevor Robinson via eli)
+ HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via
+ bobby)
+
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java?rev=1376592&r1=1376591&r2=1376592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java Thu Aug 23 16:58:36 2012
@@ -204,11 +204,13 @@ public class LineReader {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
- if (prevCharCR)
+ if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
+ }
bufferLength = in.read(buffer);
- if (bufferLength <= 0)
+ if (bufferLength <= 0) {
break; // EOF
+ }
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
@@ -223,8 +225,9 @@ public class LineReader {
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0)
+ if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
+ }
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
@@ -236,8 +239,9 @@ public class LineReader {
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long)Integer.MAX_VALUE)
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ if (bytesConsumed > (long)Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
return (int)bytesConsumed;
}
@@ -246,18 +250,56 @@ public class LineReader {
*/
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
+ /* We're reading data from inputStream, but the head of the stream may be
+ * already captured in the previous buffer, so we have several cases:
+ *
+ * 1. The buffer tail does not contain any character sequence which
+ * matches with the head of delimiter. We count it as a
+ * ambiguous byte count = 0
+ *
+ * 2. The buffer tail contains a X number of characters,
+ * that forms a sequence, which matches with the
+ * head of delimiter. We count ambiguous byte count = X
+ *
+ * // *** eg: A segment of input file is as follows
+ *
+ * " record 1792: I found this bug very interesting and
+ * I have completely read about it. record 1793: This bug
+ * can be solved easily record 1794: This ."
+ *
+ * delimiter = "record";
+ *
+ * supposing:- String at the end of buffer =
+ * "I found this bug very interesting and I have completely re"
+ * There for next buffer = "ad about it. record 179 ...."
+ *
+ * The matching characters in the input
+ * buffer tail and delimiter head = "re"
+ * Therefore, ambiguous byte count = 2 **** //
+ *
+ * 2.1 If the following bytes are the remaining characters of
+ * the delimiter, then we have to capture only up to the starting
+ * position of delimiter. That means, we need not include the
+ * ambiguous characters in str.
+ *
+ * 2.2 If the following bytes are not the remaining characters of
+ * the delimiter ( as mentioned in the example ),
+ * then we have to include the ambiguous characters in str.
+ */
str.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0;
int delPosn = 0;
+ int ambiguousByteCount=0; // To capture the ambiguous characters count
do {
- int startPosn = bufferPosn; // starting from where we left off the last
- // time
+ int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = in.read(buffer);
- if (bufferLength <= 0)
+ if (bufferLength <= 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
break; // EOF
+ }
}
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
@@ -267,7 +309,7 @@ public class LineReader {
break;
}
} else if (delPosn != 0) {
- bufferPosn--; // recheck if bufferPosn matches start of delimiter
+ bufferPosn--;
delPosn = 0;
}
}
@@ -278,14 +320,27 @@ public class LineReader {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
+ if (ambiguousByteCount > 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ //appending the ambiguous characters (refer case 2.2)
+ bytesConsumed += ambiguousByteCount;
+ ambiguousByteCount=0;
+ }
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
- } while (delPosn < recordDelimiterBytes.length
+ if (bufferPosn >= bufferLength) {
+ if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+ ambiguousByteCount = delPosn;
+ bytesConsumed -= ambiguousByteCount; //to be consumed in next
+ }
+ }
+ } while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long) Integer.MAX_VALUE)
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
- return (int) bytesConsumed;
+ }
+ return (int) bytesConsumed;
}
/**
@@ -297,7 +352,7 @@ public class LineReader {
*/
public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
-}
+ }
/**
* Read from the InputStream into the given Text.
@@ -308,5 +363,4 @@ public class LineReader {
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
-
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java?rev=1376592&r1=1376591&r2=1376592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java Thu Aug 23 16:58:36 2012
@@ -21,29 +21,121 @@ package org.apache.hadoop.util;
import java.io.ByteArrayInputStream;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.LineReader;
import org.junit.Test;
import junit.framework.Assert;
public class TestLineReader {
+ private LineReader lineReader;
+ private String TestData;
+ private String Delimiter;
+ private Text line;
@Test
public void testCustomDelimiter() throws Exception {
- String data = "record Bangalorrecord recorrecordrecord Kerala";
- String delimiter = "record";
- LineReader reader = new LineReader(
- new ByteArrayInputStream(data.getBytes()),
- delimiter.getBytes());
- Text line = new Text();
- reader.readLine(line);
- Assert.assertEquals("", line.toString());
- reader.readLine(line);
- Assert.assertEquals(" Bangalor", line.toString());
- reader.readLine(line);
- Assert.assertEquals(" recor", line.toString());
- reader.readLine(line);
- Assert.assertEquals("", line.toString());
- reader.readLine(line);
- Assert.assertEquals(" Kerala", line.toString());
+ /* TEST_1
+ * The test scenario is the tail of the buffer
+ * equals the starting character/s of delimiter
+ *
+ * The Test Data is such that,
+ *
+ * 1) we will have "</entity>" as delimiter
+ *
+ * 2) The tail of the current buffer would be "</"
+ * which matches with the starting character sequence of delimiter.
+ *
+ * 3) The Head of the next buffer would be "id>"
+ * which does NOT match with the remaining characters of delimiter.
+ *
+ * 4) Input data would be prefixed by char 'a'
+ * about numberOfCharToFillTheBuffer times.
+ * So that, one iteration to buffer the input data,
+ * would end at '</' ie equals starting 2 char of delimiter
+ *
+ * 5) For this we would take BufferSize as 64 * 1024;
+ *
+ * Check Condition
+ * In the second key value pair, the value should contain
+ * "</" from currentToken and
+ * "id>" from next token
+ */
+
+ Delimiter="</entity>";
+
+ String CurrentBufferTailToken=
+ "</entity><entity><id>Gelesh</";
+ // Ending part of Input Data Buffer
+ // It contains '</' ie delimiter character
+
+ String NextBufferHeadToken=
+ "id><name>Omathil</name></entity>";
+ // Supposing the start of next buffer is this
+
+ String Expected =
+ (CurrentBufferTailToken+NextBufferHeadToken)
+ .replace(Delimiter, "");
+ // Expected ,must capture from both the buffer, excluding Delimiter
+
+ String TestPartOfInput = CurrentBufferTailToken+NextBufferHeadToken;
+
+ int BufferSize=64 * 1024;
+ int numberOfCharToFillTheBuffer=BufferSize-CurrentBufferTailToken.length();
+ StringBuilder fillerString=new StringBuilder();
+ for (int i=0;i<numberOfCharToFillTheBuffer;i++) {
+ fillerString.append('a'); // char 'a' as a filler for the test string
+ }
+
+ TestData = fillerString + TestPartOfInput;
+ lineReader = new LineReader(
+ new ByteArrayInputStream(TestData.getBytes()),Delimiter.getBytes());
+
+ line = new Text();
+
+ lineReader.readLine(line);
+ Assert.assertEquals(fillerString.toString(),line.toString());
+
+ lineReader.readLine(line);
+ Assert.assertEquals(Expected, line.toString());
+
+ /*TEST_2
+ * The test scenario is such that,
+ * the character/s preceding the delimiter,
+ * equals the starting character/s of delimiter
+ */
+
+ Delimiter = "record";
+ StringBuilder TestStringBuilder = new StringBuilder();
+
+ TestStringBuilder.append(Delimiter+"Kerala ");
+ TestStringBuilder.append(Delimiter+"Bangalore");
+ TestStringBuilder.append(Delimiter+" North Korea");
+ TestStringBuilder.append(Delimiter+Delimiter+
+ "Guantanamo");
+ TestStringBuilder.append(Delimiter+"ecord"+"recor"+"core"); //~EOF with 're'
+
+ TestData=TestStringBuilder.toString();
+
+ lineReader = new LineReader(
+ new ByteArrayInputStream(TestData.getBytes()),Delimiter.getBytes());
+
+ lineReader.readLine(line);
+ Assert.assertEquals("",line.toString());
+ lineReader.readLine(line);
+ Assert.assertEquals("Kerala ",line.toString());
+
+ lineReader.readLine(line);
+ Assert.assertEquals("Bangalore",line.toString());
+
+ lineReader.readLine(line);
+ Assert.assertEquals(" North Korea",line.toString());
+
+ lineReader.readLine(line);
+ Assert.assertEquals("",line.toString());
+ lineReader.readLine(line);
+ Assert.assertEquals("Guantanamo",line.toString());
+
+ lineReader.readLine(line);
+ Assert.assertEquals(("ecord"+"recor"+"core"),line.toString());
}
}