You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/12/12 07:34:58 UTC

svn commit: r1420535 - in /hadoop/common/branches/branch-1: CHANGES.txt src/core/org/apache/hadoop/util/LineReader.java

Author: suresh
Date: Wed Dec 12 06:34:56 2012
New Revision: 1420535

URL: http://svn.apache.org/viewvc?rev=1420535&view=rev
Log:
HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat. Backported by Suresh Srinivas.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1420535&r1=1420534&r2=1420535&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Dec 12 06:34:56 2012
@@ -45,6 +45,9 @@ Release 1.2.0 - unreleased
     HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
     suresh)
 
+    HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
+    (Ahmed Radwan, backported by suresh)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java?rev=1420535&r1=1420534&r2=1420535&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java Wed Dec 12 06:34:56 2012
@@ -26,6 +26,14 @@ import org.apache.hadoop.io.Text;
 
 /**
  * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
  */
 public class LineReader {
   private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
@@ -40,6 +48,9 @@ public class LineReader {
   private static final byte CR = '\r';
   private static final byte LF = '\n';
 
+  // The line delimiter
+  private final byte[] recordDelimiterBytes;
+
   /**
    * Create a line reader that reads from the given stream using the
    * default buffer-size (64k).
@@ -61,6 +72,7 @@ public class LineReader {
     this.in = in;
     this.bufferSize = bufferSize;
     this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = null;
   }
 
   /**
@@ -76,6 +88,56 @@ public class LineReader {
   }
 
   /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size, and using a custom delimiter of array of
+   * bytes.
+   * @param in The input stream
+   * @param recordDelimiterBytes The delimiter
+   */
+  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = DEFAULT_BUFFER_SIZE;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size, and using a custom delimiter of array of
+   * bytes.
+   * @param in The input stream
+   * @param bufferSize Size of the read buffer
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, int bufferSize,
+      byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>, and using a custom delimiter of array of
+   * bytes.
+   * @param in input stream
+   * @param conf configuration
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf,
+      byte[] recordDelimiterBytes) throws IOException {
+    this.in = in;
+    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+
+  /**
    * Close the underlying stream.
    * @throws IOException
    */
@@ -84,10 +146,7 @@ public class LineReader {
   }
   
   /**
-   * Read one line from the InputStream into the given Text.  A line
-   * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
-   * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
-   * line.
+   * Read one line from the InputStream into the given Text.
    *
    * @param str the object to store the given line (without newline)
    * @param maxLineLength the maximum number of bytes to store into str;
@@ -104,6 +163,18 @@ public class LineReader {
    */
   public int readLine(Text str, int maxLineLength,
                       int maxBytesToConsume) throws IOException {
+    if (this.recordDelimiterBytes != null) {
+      return readCustomLine(str, maxLineLength, maxBytesToConsume);
+    } else {
+      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+    }
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+  throws IOException {
     /* We're reading data from in, but the head of the stream may be
      * already buffered in buffer, so we have several cases:
      * 1. No newline characters are in the buffer, so we need to copy
@@ -167,6 +238,52 @@ public class LineReader {
   }
 
   /**
+   * Read a line terminated by a custom delimiter.
+   */
+  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    str.clear();
+    int txtLength = 0; // tracks str.getLength(), as an optimization
+    long bytesConsumed = 0;
+    int delPosn = 0;
+    do {
+      int startPosn = bufferPosn; // starting from where we left off the last
+      // time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0)
+          break; // EOF
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) {
+        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+          delPosn++;
+          if (delPosn >= recordDelimiterBytes.length) {
+            bufferPosn++;
+            break;
+          }
+        } else {
+          delPosn = 0;
+        }
+      }
+      int readLength = bufferPosn - startPosn;
+      bytesConsumed += readLength;
+      int appendLength = readLength - delPosn;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (delPosn < recordDelimiterBytes.length
+        && bytesConsumed < maxBytesToConsume);
+    if (bytesConsumed > (long) Integer.MAX_VALUE)
+      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+    return (int) bytesConsumed;
+  }
+
+  /**
    * Read from the InputStream into the given Text.
    * @param str the object to store the given line
    * @param maxLineLength the maximum number of bytes to store into str.