You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/18 03:34:53 UTC

svn commit: r696532 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/lib/input/

Author: omalley
Date: Wed Sep 17 18:34:53 2008
New Revision: 696532

URL: http://svn.apache.org/viewvc?rev=696532&view=rev
Log:
HADOOP-4186. Factor LineReader out of LineRecordReader.
From: Tom White<to...@apache.org>

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Sep 17 18:34:53 2008
@@ -329,6 +329,9 @@
     HADOOP-4181. Include a .gitignore and saveVersion.sh change to support
     developing under git. (omalley)
 
+    HADOOP-4186. Factor LineReader out of LineRecordReader. (tomwhite via
+    omalley)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/HarFileSystem.java Wed Sep 17 18:34:53 2008
@@ -23,11 +23,12 @@
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * This is an implementation of the Hadoop Archive 
@@ -130,8 +131,7 @@
   // of archives
   public int getHarVersion() throws IOException { 
     FSDataInputStream masterIn = fs.open(masterIndex);
-    LineRecordReader.LineReader lmaster = new LineRecordReader.LineReader(
-                                              masterIn, getConf());
+    LineReader lmaster = new LineReader(masterIn, getConf());
     Text line = new Text();
     lmaster.readLine(line);
     try {
@@ -400,8 +400,7 @@
     // in the index file
     FSDataInputStream in = fs.open(masterIndex);
     FileStatus masterStat = fs.getFileStatus(masterIndex);
-    LineRecordReader.LineReader lin = new LineRecordReader.LineReader(in,
-                                          getConf());
+    LineReader lin = new LineReader(in, getConf());
     Text line = new Text();
     long read = lin.readLine(line);
    //ignore the first line. this is the header of the index files
@@ -426,8 +425,7 @@
       // do nothing just a read.
     }
     FSDataInputStream aIn = fs.open(archiveIndex);
-    LineRecordReader.LineReader aLin = new LineRecordReader.LineReader(aIn, 
-                                           getConf());
+    LineReader aLin = new LineReader(aIn, getConf());
     String retStr = null;
     // now start reading the real index file
      read = 0;

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java?rev=696532&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/LineReader.java Wed Sep 17 18:34:53 2008
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A class that provides a line reader from an input stream.
+ */
+public class LineReader {
+  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private int bufferSize = DEFAULT_BUFFER_SIZE;
+  private InputStream in;
+  private byte[] buffer;
+  // the number of bytes of real data in the buffer
+  private int bufferLength = 0;
+  // the current position in the buffer
+  private int bufferPosn = 0;
+
+  /**
+   * Create a line reader that reads from the given stream using the 
+   * given buffer-size.
+   * @param in
+   * @throws IOException
+   */
+  LineReader(InputStream in, int bufferSize) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>.
+   * @param in input stream
+   * @param conf configuration
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf) throws IOException {
+    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+  }
+
+  /**
+   * Fill the buffer with more data.
+   * @return was there more data?
+   * @throws IOException
+   */
+  boolean backfill() throws IOException {
+    bufferPosn = 0;
+    bufferLength = in.read(buffer);
+    return bufferLength > 0;
+  }
+  
+  /**
+   * Close the underlying stream.
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    in.close();
+  }
+  
+  /**
+   * Read from the InputStream into the given Text.
+   * @param str the object to store the given line
+   * @param maxLineLength the maximum number of bytes to store into str.
+   * @param maxBytesToConsume the maximum number of bytes to consume in this call.
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength,
+                      int maxBytesToConsume) throws IOException {
+    str.clear();
+    boolean hadFinalNewline = false;
+    boolean hadFinalReturn = false;
+    boolean hitEndOfFile = false;
+    int startPosn = bufferPosn;
+    long bytesConsumed = 0;
+    outerLoop: while (true) {
+      if (bufferPosn >= bufferLength) {
+        if (!backfill()) {
+          hitEndOfFile = true;
+          break;
+        }
+      }
+      startPosn = bufferPosn;
+      for(; bufferPosn < bufferLength; ++bufferPosn) {
+        switch (buffer[bufferPosn]) {
+        case '\n':
+          hadFinalNewline = true;
+          bufferPosn += 1;
+          break outerLoop;
+        case '\r':
+          if (hadFinalReturn) {
+            // leave this \r in the stream, so we'll get it next time
+            break outerLoop;
+          }
+          hadFinalReturn = true;
+          break;
+        default:
+          if (hadFinalReturn) {
+            break outerLoop;
+          }
+        }        
+      }
+      bytesConsumed += bufferPosn - startPosn;
+      int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
+      length = (int)Math.min(length, maxLineLength - str.getLength());
+      if (length >= 0) {
+        str.append(buffer, startPosn, length);
+      }
+      if (bytesConsumed >= maxBytesToConsume) {
+        return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+      }
+    }
+    int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
+    if (!hitEndOfFile) {
+      bytesConsumed += bufferPosn - startPosn;
+      int length = bufferPosn - startPosn - newlineLength;
+      length = (int)Math.min(length, maxLineLength - str.getLength());
+      if (length > 0) {
+        str.append(buffer, startPosn, length);
+      }
+    }
+    return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   * @param str the object to store the given line
+   * @param maxLineLength the maximum number of bytes to store into str.
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength) throws IOException {
+    return readLine(str, maxLineLength, Integer.MAX_VALUE);
+}
+
+  /**
+   * Read from the InputStream into the given Text.
+   * @param str the object to store the given line
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str) throws IOException {
+    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java Wed Sep 17 18:34:53 2008
@@ -49,7 +49,9 @@
 
   /**
    * A class that provides a line reader from an input stream.
+   * @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.
    */
+  @Deprecated
   public static class LineReader {
     private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
     private int bufferSize = DEFAULT_BUFFER_SIZE;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=696532&r1=696531&r2=696532&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Wed Sep 17 18:34:53 2008
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce.lib.input;
 
 import java.io.IOException;
-import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,6 +31,7 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -48,150 +48,6 @@
   private LineReader in;
   int maxLineLength;
 
-  /**
-   * A class that provides a line reader from an input stream.
-   */
-  public static class LineReader {
-    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-    private int bufferSize = DEFAULT_BUFFER_SIZE;
-    private InputStream in;
-    private byte[] buffer;
-    // the number of bytes of real data in the buffer
-    private int bufferLength = 0;
-    // the current position in the buffer
-    private int bufferPosn = 0;
-
-    /**
-     * Create a line reader that reads from the given stream using the 
-     * given buffer-size.
-     * @param in
-     * @throws IOException
-     */
-    LineReader(InputStream in, int bufferSize) {
-      this.in = in;
-      this.bufferSize = bufferSize;
-      this.buffer = new byte[this.bufferSize];
-    }
-
-    /**
-     * Create a line reader that reads from the given stream using the
-     * <code>io.file.buffer.size</code> specified in the given
-     * <code>Configuration</code>.
-     * @param in input stream
-     * @param conf configuration
-     * @throws IOException
-     */
-    public LineReader(InputStream in, Configuration conf) throws IOException {
-      this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
-    }
-
-    /**
-     * Fill the buffer with more data.
-     * @return was there more data?
-     * @throws IOException
-     */
-    boolean backfill() throws IOException {
-      bufferPosn = 0;
-      bufferLength = in.read(buffer);
-      return bufferLength > 0;
-    }
-    
-    /**
-     * Close the underlying stream.
-     * @throws IOException
-     */
-    public void close() throws IOException {
-      in.close();
-    }
-    
-    /**
-     * Read from the InputStream into the given Text.
-     * @param str the object to store the given line
-     * @param maxLineLength the maximum number of bytes to store into str.
-     * @param maxBytesToConsume the maximum number of bytes to consume in this call.
-     * @return the number of bytes read including the newline
-     * @throws IOException if the underlying stream throws
-     */
-    public int readLine(Text str, int maxLineLength,
-                        int maxBytesToConsume) throws IOException {
-      str.clear();
-      boolean hadFinalNewline = false;
-      boolean hadFinalReturn = false;
-      boolean hitEndOfFile = false;
-      int startPosn = bufferPosn;
-      long bytesConsumed = 0;
-      outerLoop: while (true) {
-        if (bufferPosn >= bufferLength) {
-          if (!backfill()) {
-            hitEndOfFile = true;
-            break;
-          }
-        }
-        startPosn = bufferPosn;
-        for(; bufferPosn < bufferLength; ++bufferPosn) {
-          switch (buffer[bufferPosn]) {
-          case '\n':
-            hadFinalNewline = true;
-            bufferPosn += 1;
-            break outerLoop;
-          case '\r':
-            if (hadFinalReturn) {
-              // leave this \r in the stream, so we'll get it next time
-              break outerLoop;
-            }
-            hadFinalReturn = true;
-            break;
-          default:
-            if (hadFinalReturn) {
-              break outerLoop;
-            }
-          }        
-        }
-        bytesConsumed += bufferPosn - startPosn;
-        int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
-        length = (int)Math.min(length, maxLineLength - str.getLength());
-        if (length >= 0) {
-          str.append(buffer, startPosn, length);
-        }
-        if (bytesConsumed >= maxBytesToConsume) {
-          return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
-        }
-      }
-      int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
-      if (!hitEndOfFile) {
-        bytesConsumed += bufferPosn - startPosn;
-        int length = bufferPosn - startPosn - newlineLength;
-        length = (int)Math.min(length, maxLineLength - str.getLength());
-        if (length > 0) {
-          str.append(buffer, startPosn, length);
-        }
-      }
-      return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
-    }
-
-    /**
-     * Read from the InputStream into the given Text.
-     * @param str the object to store the given line
-     * @param maxLineLength the maximum number of bytes to store into str.
-     * @return the number of bytes read including the newline
-     * @throws IOException if the underlying stream throws
-     */
-    public int readLine(Text str, int maxLineLength) throws IOException {
-      return readLine(str, maxLineLength, Integer.MAX_VALUE);
-  }
-
-    /**
-     * Read from the InputStream into the given Text.
-     * @param str the object to store the given line
-     * @return the number of bytes read including the newline
-     * @throws IOException if the underlying stream throws
-     */
-    public int readLine(Text str) throws IOException {
-      return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
-    }
-
-  }
-
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
     FileSplit split = (FileSplit) genericSplit;