You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/12/12 07:34:58 UTC
svn commit: r1420535 - in /hadoop/common/branches/branch-1: CHANGES.txt
src/core/org/apache/hadoop/util/LineReader.java
Author: suresh
Date: Wed Dec 12 06:34:56 2012
New Revision: 1420535
URL: http://svn.apache.org/viewvc?rev=1420535&view=rev
Log:
HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat. Backported by Suresh Srinivas.
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1420535&r1=1420534&r2=1420535&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Dec 12 06:34:56 2012
@@ -45,6 +45,9 @@ Release 1.2.0 - unreleased
HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
suresh)
+ HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
+ (Ahmed Radwan, backported by suresh)
+
IMPROVEMENTS
HDFS-3515. Port HDFS-1457 to branch-1. (eli)
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java?rev=1420535&r1=1420534&r2=1420535&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/util/LineReader.java Wed Dec 12 06:34:56 2012
@@ -26,6 +26,14 @@ import org.apache.hadoop.io.Text;
/**
* A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
*/
public class LineReader {
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
@@ -40,6 +48,9 @@ public class LineReader {
private static final byte CR = '\r';
private static final byte LF = '\n';
+ // The line delimiter
+ private final byte[] recordDelimiterBytes;
+
/**
* Create a line reader that reads from the given stream using the
* default buffer-size (64k).
@@ -61,6 +72,7 @@ public class LineReader {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = null;
}
/**
@@ -76,6 +88,56 @@ public class LineReader {
}
/**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size, and using a custom delimiter of array of
+ * bytes.
+ * @param in The input stream
+ * @param recordDelimiterBytes The delimiter
+ */
+ public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size, and using a custom delimiter of array of
+ * bytes.
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException
+ */
+ public LineReader(InputStream in, int bufferSize,
+ byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * <code>io.file.buffer.size</code> specified in the given
+ * <code>Configuration</code>, and using a custom delimiter of array of
+ * bytes.
+ * @param in input stream
+ * @param conf configuration
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException
+ */
+ public LineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ this.in = in;
+ this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+
+ /**
* Close the underlying stream.
* @throws IOException
*/
@@ -84,10 +146,7 @@ public class LineReader {
}
/**
- * Read one line from the InputStream into the given Text. A line
- * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
- * line.
+ * Read one line from the InputStream into the given Text.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str;
@@ -104,6 +163,18 @@ public class LineReader {
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
+ if (this.recordDelimiterBytes != null) {
+ return readCustomLine(str, maxLineLength, maxBytesToConsume);
+ } else {
+ return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+ }
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
@@ -167,6 +238,52 @@ public class LineReader {
}
/**
+ * Read a line terminated by a custom delimiter.
+ */
+ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ str.clear();
+ int txtLength = 0; // tracks str.getLength(), as an optimization
+ long bytesConsumed = 0;
+ int delPosn = 0;
+ do {
+ int startPosn = bufferPosn; // starting from where we left off the last
+ // time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0)
+ break; // EOF
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+ delPosn++;
+ if (delPosn >= recordDelimiterBytes.length) {
+ bufferPosn++;
+ break;
+ }
+ } else {
+ delPosn = 0;
+ }
+ }
+ int readLength = bufferPosn - startPosn;
+ bytesConsumed += readLength;
+ int appendLength = readLength - delPosn;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (delPosn < recordDelimiterBytes.length
+ && bytesConsumed < maxBytesToConsume);
+ if (bytesConsumed > (long) Integer.MAX_VALUE)
+ throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+ return (int) bytesConsumed;
+ }
+
+ /**
* Read from the InputStream into the given Text.
* @param str the object to store the given line
* @param maxLineLength the maximum number of bytes to store into str.