You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2012/08/23 18:58:36 UTC

svn commit: r1376592 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt src/main/java/org/apache/hadoop/util/LineReader.java src/test/java/org/apache/hadoop/util/TestLineReader.java

Author: bobby
Date: Thu Aug 23 16:58:36 2012
New Revision: 1376592

URL: http://svn.apache.org/viewvc?rev=1376592&view=rev
Log:
HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via  bobby)

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1376592&r1=1376591&r2=1376592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Aug 23 16:58:36 2012
@@ -832,6 +832,9 @@ Release 2.0.0-alpha - 05-23-2012
     HADOOP-7868. Hadoop native fails to compile when default linker
     option is -Wl,--as-needed. (Trevor Robinson via eli)
 
+    HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via
+    bobby) 
+
 Release 0.23.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java?rev=1376592&r1=1376591&r2=1376592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java Thu Aug 23 16:58:36 2012
@@ -204,11 +204,13 @@ public class LineReader {
       int startPosn = bufferPosn; //starting from where we left off the last time
       if (bufferPosn >= bufferLength) {
         startPosn = bufferPosn = 0;
-        if (prevCharCR)
+        if (prevCharCR) {
           ++bytesConsumed; //account for CR from previous read
+        }
         bufferLength = in.read(buffer);
-        if (bufferLength <= 0)
+        if (bufferLength <= 0) {
           break; // EOF
+        }
       }
       for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
         if (buffer[bufferPosn] == LF) {
@@ -223,8 +225,9 @@ public class LineReader {
         prevCharCR = (buffer[bufferPosn] == CR);
       }
       int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0)
+      if (prevCharCR && newlineLength == 0) {
         --readLength; //CR at the end of the buffer
+      }
       bytesConsumed += readLength;
       int appendLength = readLength - newlineLength;
       if (appendLength > maxLineLength - txtLength) {
@@ -236,8 +239,9 @@ public class LineReader {
       }
     } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
 
-    if (bytesConsumed > (long)Integer.MAX_VALUE)
-      throw new IOException("Too many bytes before newline: " + bytesConsumed);    
+    if (bytesConsumed > (long)Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
     return (int)bytesConsumed;
   }
 
@@ -246,18 +250,56 @@ public class LineReader {
    */
   private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
       throws IOException {
+   /* We're reading data from inputStream, but the head of the stream may be
+    *  already captured in the previous buffer, so we have several cases:
+    * 
+    * 1. The buffer tail does not contain any character sequence which
+    *    matches with the head of delimiter. We count it as a 
+    *    ambiguous byte count = 0
+    *    
+    * 2. The buffer tail contains a X number of characters,
+    *    that forms a sequence, which matches with the
+    *    head of delimiter. We count ambiguous byte count = X
+    *    
+    *    // ***  eg: A segment of input file is as follows
+    *    
+    *    " record 1792: I found this bug very interesting and
+    *     I have completely read about it. record 1793: This bug
+    *     can be solved easily record 1794: This ." 
+    *    
+    *    delimiter = "record";
+    *        
+    *    supposing:- String at the end of buffer =
+    *    "I found this bug very interesting and I have completely re"
+    *    There for next buffer = "ad about it. record 179       ...."           
+    *     
+    *     The matching characters in the input
+    *     buffer tail and delimiter head = "re" 
+    *     Therefore, ambiguous byte count = 2 ****   //
+    *     
+    *     2.1 If the following bytes are the remaining characters of
+    *         the delimiter, then we have to capture only up to the starting 
+    *         position of delimiter. That means, we need not include the 
+    *         ambiguous characters in str.
+    *     
+    *     2.2 If the following bytes are not the remaining characters of
+    *         the delimiter ( as mentioned in the example ), 
+    *         then we have to include the ambiguous characters in str. 
+    */
     str.clear();
     int txtLength = 0; // tracks str.getLength(), as an optimization
     long bytesConsumed = 0;
     int delPosn = 0;
+    int ambiguousByteCount=0; // To capture the ambiguous characters count
     do {
-      int startPosn = bufferPosn; // starting from where we left off the last
-      // time
+      int startPosn = bufferPosn; // Start from previous end position
       if (bufferPosn >= bufferLength) {
         startPosn = bufferPosn = 0;
         bufferLength = in.read(buffer);
-        if (bufferLength <= 0)
+        if (bufferLength <= 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
           break; // EOF
+        }
       }
       for (; bufferPosn < bufferLength; ++bufferPosn) {
         if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
@@ -267,7 +309,7 @@ public class LineReader {
             break;
           }
         } else if (delPosn != 0) {
-          bufferPosn--; // recheck if bufferPosn matches start of delimiter
+          bufferPosn--;
           delPosn = 0;
         }
       }
@@ -278,14 +320,27 @@ public class LineReader {
         appendLength = maxLineLength - txtLength;
       }
       if (appendLength > 0) {
+        if (ambiguousByteCount > 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          //appending the ambiguous characters (refer case 2.2)
+          bytesConsumed += ambiguousByteCount;
+          ambiguousByteCount=0;
+        }
         str.append(buffer, startPosn, appendLength);
         txtLength += appendLength;
       }
-    } while (delPosn < recordDelimiterBytes.length
+      if (bufferPosn >= bufferLength) {
+        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+          ambiguousByteCount = delPosn;
+          bytesConsumed -= ambiguousByteCount; //to be consumed in next
+        }
+      }
+    } while (delPosn < recordDelimiterBytes.length 
         && bytesConsumed < maxBytesToConsume);
-    if (bytesConsumed > (long) Integer.MAX_VALUE)
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
       throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
-    return (int) bytesConsumed;
+    }
+    return (int) bytesConsumed; 
   }
 
   /**
@@ -297,7 +352,7 @@ public class LineReader {
    */
   public int readLine(Text str, int maxLineLength) throws IOException {
     return readLine(str, maxLineLength, Integer.MAX_VALUE);
-}
+  }
 
   /**
    * Read from the InputStream into the given Text.
@@ -308,5 +363,4 @@ public class LineReader {
   public int readLine(Text str) throws IOException {
     return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
   }
-
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java?rev=1376592&r1=1376591&r2=1376592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java Thu Aug 23 16:58:36 2012
@@ -21,29 +21,121 @@ package org.apache.hadoop.util;
 import java.io.ByteArrayInputStream;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.LineReader;
 import org.junit.Test;
 
 import junit.framework.Assert;
 
 public class TestLineReader {
+  private LineReader lineReader;
+  private String TestData;
+  private String Delimiter;
+  private Text line;
 
   @Test
   public void testCustomDelimiter() throws Exception {
-    String data = "record Bangalorrecord recorrecordrecord Kerala";
-    String delimiter = "record";
-    LineReader reader = new LineReader(
-        new ByteArrayInputStream(data.getBytes()),
-        delimiter.getBytes());
-    Text line = new Text();
-    reader.readLine(line);
-    Assert.assertEquals("", line.toString());
-    reader.readLine(line);
-    Assert.assertEquals(" Bangalor", line.toString());
-    reader.readLine(line);
-    Assert.assertEquals(" recor", line.toString());
-    reader.readLine(line);
-    Assert.assertEquals("", line.toString());
-    reader.readLine(line);
-    Assert.assertEquals(" Kerala", line.toString());
+    /* TEST_1
+     * The test scenario is the tail of the buffer
+     * equals the starting character/s of delimiter
+     * 
+     * The Test Data is such that,
+     *   
+     * 1) we will have "</entity>" as delimiter  
+     *  
+     * 2) The tail of the current buffer would be "</"
+     *    which matches with the starting character sequence of delimiter.
+     *    
+     * 3) The Head of the next buffer would be   "id>" 
+     *    which does NOT match with the remaining characters of delimiter.
+     *   
+     * 4) Input data would be prefixed by char 'a' 
+     *    about numberOfCharToFillTheBuffer times.
+     *    So that, one iteration to buffer the input data,
+     *    would end at '</' ie equals starting 2 char of delimiter  
+     *     
+     * 5) For this we would take BufferSize as 64 * 1024;
+     * 
+     * Check Condition
+     *  In the second key value pair, the value should contain 
+     *  "</"  from currentToken and
+     *  "id>" from next token 
+     */  
+    
+    Delimiter="</entity>"; 
+    
+    String CurrentBufferTailToken=
+        "</entity><entity><id>Gelesh</";
+    // Ending part of Input Data Buffer
+    // It contains '</' ie delimiter character 
+    
+    String NextBufferHeadToken=
+        "id><name>Omathil</name></entity>";
+    // Supposing the start of next buffer is this
+    
+    String Expected = 
+        (CurrentBufferTailToken+NextBufferHeadToken)
+        .replace(Delimiter, "");                       
+    // Expected ,must capture from both the buffer, excluding Delimiter
+  
+    String TestPartOfInput = CurrentBufferTailToken+NextBufferHeadToken;
+  
+    int BufferSize=64 * 1024;
+    int numberOfCharToFillTheBuffer=BufferSize-CurrentBufferTailToken.length();
+    StringBuilder fillerString=new StringBuilder();
+    for (int i=0;i<numberOfCharToFillTheBuffer;i++) {  
+      fillerString.append('a'); // char 'a' as a filler for the test string
+    }
+
+    TestData = fillerString + TestPartOfInput;
+    lineReader = new LineReader(
+        new ByteArrayInputStream(TestData.getBytes()),Delimiter.getBytes());
+    
+    line = new Text();
+    
+    lineReader.readLine(line); 
+    Assert.assertEquals(fillerString.toString(),line.toString());
+    
+    lineReader.readLine(line);
+    Assert.assertEquals(Expected, line.toString());
+    
+    /*TEST_2
+     * The test scenario is such that,
+     * the character/s preceding the delimiter,
+     * equals the starting character/s of delimiter
+     */
+    
+    Delimiter = "record";
+    StringBuilder TestStringBuilder = new StringBuilder();
+    
+    TestStringBuilder.append(Delimiter+"Kerala ");
+    TestStringBuilder.append(Delimiter+"Bangalore");
+    TestStringBuilder.append(Delimiter+" North Korea");
+    TestStringBuilder.append(Delimiter+Delimiter+
+                        "Guantanamo");
+    TestStringBuilder.append(Delimiter+"ecord"+"recor"+"core"); //~EOF with 're'
+    
+    TestData=TestStringBuilder.toString();
+    
+    lineReader = new LineReader(
+        new ByteArrayInputStream(TestData.getBytes()),Delimiter.getBytes());
+    
+    lineReader.readLine(line); 
+    Assert.assertEquals("",line.toString());
+    lineReader.readLine(line); 
+    Assert.assertEquals("Kerala ",line.toString());
+    
+    lineReader.readLine(line); 
+    Assert.assertEquals("Bangalore",line.toString());
+    
+    lineReader.readLine(line); 
+    Assert.assertEquals(" North Korea",line.toString());
+    
+    lineReader.readLine(line); 
+    Assert.assertEquals("",line.toString());
+    lineReader.readLine(line); 
+    Assert.assertEquals("Guantanamo",line.toString());
+    
+    lineReader.readLine(line); 
+    Assert.assertEquals(("ecord"+"recor"+"core"),line.toString());
   }
 }