You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/18 03:34:53 UTC
svn commit: r696532 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/util/
src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/lib/input/
Author: omalley
Date: Wed Sep 17 18:34:53 2008
New Revision: 696532
URL: http://svn.apache.org/viewvc?rev=696532&view=rev
Log:
HADOOP-4186. Factor LineReader out of LineRecordReader.
From: Tom White<to...@apache.org>
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Sep 17 18:34:53 2008
@@ -329,6 +329,9 @@
HADOOP-4181. Include a .gitignore and saveVersion.sh change to support
developing under git. (omalley)
+ HADOOP-4186. Factor LineReader out of LineRecordReader. (tomwhite via
+ omalley)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java Wed Sep 17 18:34:53 2008
@@ -23,11 +23,12 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.conf.Configuration;
/**
* This is an implementation of the Hadoop Archive
@@ -130,8 +131,7 @@
// of archives
public int getHarVersion() throws IOException {
FSDataInputStream masterIn = fs.open(masterIndex);
- LineRecordReader.LineReader lmaster = new LineRecordReader.LineReader(
- masterIn, getConf());
+ LineReader lmaster = new LineReader(masterIn, getConf());
Text line = new Text();
lmaster.readLine(line);
try {
@@ -400,8 +400,7 @@
// in the index file
FSDataInputStream in = fs.open(masterIndex);
FileStatus masterStat = fs.getFileStatus(masterIndex);
- LineRecordReader.LineReader lin = new LineRecordReader.LineReader(in,
- getConf());
+ LineReader lin = new LineReader(in, getConf());
Text line = new Text();
long read = lin.readLine(line);
//ignore the first line. this is the header of the index files
@@ -426,8 +425,7 @@
// do nothing just a read.
}
FSDataInputStream aIn = fs.open(archiveIndex);
- LineRecordReader.LineReader aLin = new LineRecordReader.LineReader(aIn,
- getConf());
+ LineReader aLin = new LineReader(aIn, getConf());
String retStr = null;
// now start reading the real index file
read = 0;
Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java?rev=696532&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java Wed Sep 17 18:34:53 2008
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A class that provides a line reader from an input stream.
+ */
+public class LineReader {
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private InputStream in;
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ * @param in
+ * @throws IOException
+ */
+ LineReader(InputStream in, int bufferSize) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * <code>io.file.buffer.size</code> specified in the given
+ * <code>Configuration</code>.
+ * @param in input stream
+ * @param conf configuration
+ * @throws IOException
+ */
+ public LineReader(InputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+ }
+
+ /**
+ * Fill the buffer with more data.
+ * @return was there more data?
+ * @throws IOException
+ */
+ boolean backfill() throws IOException {
+ bufferPosn = 0;
+ bufferLength = in.read(buffer);
+ return bufferLength > 0;
+ }
+
+ /**
+ * Close the underlying stream.
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @param maxBytesToConsume the maximum number of bytes to consume in this call.
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength,
+ int maxBytesToConsume) throws IOException {
+ str.clear();
+ boolean hadFinalNewline = false;
+ boolean hadFinalReturn = false;
+ boolean hitEndOfFile = false;
+ int startPosn = bufferPosn;
+ long bytesConsumed = 0;
+ outerLoop: while (true) {
+ if (bufferPosn >= bufferLength) {
+ if (!backfill()) {
+ hitEndOfFile = true;
+ break;
+ }
+ }
+ startPosn = bufferPosn;
+ for(; bufferPosn < bufferLength; ++bufferPosn) {
+ switch (buffer[bufferPosn]) {
+ case '\n':
+ hadFinalNewline = true;
+ bufferPosn += 1;
+ break outerLoop;
+ case '\r':
+ if (hadFinalReturn) {
+ // leave this \r in the stream, so we'll get it next time
+ break outerLoop;
+ }
+ hadFinalReturn = true;
+ break;
+ default:
+ if (hadFinalReturn) {
+ break outerLoop;
+ }
+ }
+ }
+ bytesConsumed += bufferPosn - startPosn;
+ int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
+ length = (int)Math.min(length, maxLineLength - str.getLength());
+ if (length >= 0) {
+ str.append(buffer, startPosn, length);
+ }
+ if (bytesConsumed >= maxBytesToConsume) {
+ return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+ }
+ }
+ int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
+ if (!hitEndOfFile) {
+ bytesConsumed += bufferPosn - startPosn;
+ int length = bufferPosn - startPosn - newlineLength;
+ length = (int)Math.min(length, maxLineLength - str.getLength());
+ if (length > 0) {
+ str.append(buffer, startPosn, length);
+ }
+ }
+ return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+}
+
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java Wed Sep 17 18:34:53 2008
@@ -49,7 +49,9 @@
/**
* A class that provides a line reader from an input stream.
+ * @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.
*/
+ @Deprecated
public static class LineReader {
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Wed Sep 17 18:34:53 2008
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
-import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,6 +31,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -48,150 +48,6 @@
private LineReader in;
int maxLineLength;
- /**
- * A class that provides a line reader from an input stream.
- */
- public static class LineReader {
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
-
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size.
- * @param in
- * @throws IOException
- */
- LineReader(InputStream in, int bufferSize) {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * <code>io.file.buffer.size</code> specified in the given
- * <code>Configuration</code>.
- * @param in input stream
- * @param conf configuration
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf) throws IOException {
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
- }
-
- /**
- * Fill the buffer with more data.
- * @return was there more data?
- * @throws IOException
- */
- boolean backfill() throws IOException {
- bufferPosn = 0;
- bufferLength = in.read(buffer);
- return bufferLength > 0;
- }
-
- /**
- * Close the underlying stream.
- * @throws IOException
- */
- public void close() throws IOException {
- in.close();
- }
-
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @param maxBytesToConsume the maximum number of bytes to consume in this call.
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume) throws IOException {
- str.clear();
- boolean hadFinalNewline = false;
- boolean hadFinalReturn = false;
- boolean hitEndOfFile = false;
- int startPosn = bufferPosn;
- long bytesConsumed = 0;
- outerLoop: while (true) {
- if (bufferPosn >= bufferLength) {
- if (!backfill()) {
- hitEndOfFile = true;
- break;
- }
- }
- startPosn = bufferPosn;
- for(; bufferPosn < bufferLength; ++bufferPosn) {
- switch (buffer[bufferPosn]) {
- case '\n':
- hadFinalNewline = true;
- bufferPosn += 1;
- break outerLoop;
- case '\r':
- if (hadFinalReturn) {
- // leave this \r in the stream, so we'll get it next time
- break outerLoop;
- }
- hadFinalReturn = true;
- break;
- default:
- if (hadFinalReturn) {
- break outerLoop;
- }
- }
- }
- bytesConsumed += bufferPosn - startPosn;
- int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
- length = (int)Math.min(length, maxLineLength - str.getLength());
- if (length >= 0) {
- str.append(buffer, startPosn, length);
- }
- if (bytesConsumed >= maxBytesToConsume) {
- return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
- }
- }
- int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
- if (!hitEndOfFile) {
- bytesConsumed += bufferPosn - startPosn;
- int length = bufferPosn - startPosn - newlineLength;
- length = (int)Math.min(length, maxLineLength - str.getLength());
- if (length > 0) {
- str.append(buffer, startPosn, length);
- }
- }
- return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
- }
-
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength) throws IOException {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
-
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str) throws IOException {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
-
- }
-
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;