You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/08 23:19:43 UTC

svn commit: r441653 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/ src/test/org/apache/hadoop/io/compress/ src/test/org/apache...

Author: cutting
Date: Fri Sep  8 14:19:41 2006
New Revision: 441653

URL: http://svn.apache.org/viewvc?view=rev&rev=441653
Log:
HADOOP-474.  Add CompressionCodecFactory and use it in TextInputFormat and TextOutputFormat.  Also add gzip codec and fix some problems with UTF8 text inputs.  Contributed by Owen.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Sep  8 14:19:41 2006
@@ -161,6 +161,13 @@
 40. HADOOP-517.  Fix a contrib/streaming bug in end-of-line detection.
     (Hairong Kuang via cutting)
 
+41. HADOOP-474.  Add CompressionCodecFactory, and use it in
+    TextInputFormat and TextOutputFormat.  Compressed input files are
+    automatically decompressed when they have the correct extension.
+    Output files will, when output compression is specified, be
+    generated with an approprate extension.  Also add a gzip codec and
+    fix problems with UTF8 text inputs.  (omalley via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Sep  8 14:19:41 2006
@@ -87,6 +87,13 @@
   facilitate opening large map files using less memory.</description>
 </property>
 
+<property>
+  <name>io.compression.codecs</name>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec</value>
+  <description>A list of the compression codec classes that can be used 
+               for compression/decompression.</description>
+</property>
+
 <!-- file system properties -->
 
 <property>
@@ -441,6 +448,21 @@
 -->
 
 <property>
+  <name>mapred.output.compress</name>
+  <value>false</value>
+  <description>Should the outputs of the reduces be compressed?
+  </description>
+</property>
+
+<property>
+  <name>mapred.output.compression.codec</name>
+  <value>org.apache.hadoop.io.compress.DefaultCodec</value>
+  <description>If the reduce outputs are compressed, how should they be 
+               compressed?
+  </description>
+</property>
+
+<property>
   <name>mapred.compress.map.output</name>
   <value>false</value>
   <description>Should the outputs of the maps be compressed before being
@@ -449,7 +471,7 @@
 </property>
 
 <property>
-  <name>mapred.seqfile.compress.blocksize</name>
+  <name>io.seqfile.compress.blocksize</name>
   <value>1000000</value>
   <description>The minimum block size for compression in block compressed 
   				SequenceFiles.
@@ -457,7 +479,7 @@
 </property>
 
 <property>
-  <name>mapred.seqfile.lazydecompress</name>
+  <name>io.seqfile.lazydecompress</name>
   <value>true</value>
   <description>Should values of block-compressed SequenceFiles be decompressed
   				only when necessary.
@@ -465,7 +487,7 @@
 </property>
 
 <property>
-  <name>mapred.seqfile.sorter.recordlimit</name>
+  <name>io.seqfile.sorter.recordlimit</name>
   <value>1000000</value>
   <description>The limit on number of records to be kept in memory in a spill 
   				in SequenceFiles.Sorter
@@ -473,8 +495,8 @@
 </property>
 
 <property>
-  <name>mapred.seqfile.compression.type</name>
-  <value>NONE</value>
+  <name>io.seqfile.compression.type</name>
+  <value>RECORD</value>
   <description>The default compression type for SequenceFile.Writer.
   </description>
 </property>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Sep  8 14:19:41 2006
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -63,6 +64,27 @@
     /** Compress sequences of records together in blocks. */
     BLOCK
   }
+
+  /**
+   * Get the compression type for the reduce outputs
+   * @param job the job config to look in
+   * @return the kind of compression to use
+   */
+  static public CompressionType getCompressionType(Configuration job) {
+    String name = job.get("io.seqfile.compression.type");
+    return name == null ? CompressionType.RECORD : 
+                          CompressionType.valueOf(name);
+  }
+  
+  /**
+   * Set the compression type for sequence files.
+   * @param job the configuration to modify
+   * @param val the new compression type (none, block, record)
+   */
+  static public void setCompressionType(Configuration job, 
+                                        CompressionType val) {
+    job.set("io.seqfile.compression.type", val.toString());
+  }
   
   /**
    * Construct the preferred type of SequenceFile Writer.
@@ -685,7 +707,7 @@
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
       super.init(name, fs.create(name), keyClass, valClass, true, codec);
-      init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
+      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
       writeFileHeader();
@@ -699,7 +721,7 @@
     throws IOException {
       super.init(name, fs.create(name, progress), keyClass, valClass, 
           true, codec);
-      init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
+      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
       writeFileHeader();
@@ -998,7 +1020,7 @@
       }
       
 
-      lazyDecompress = conf.getBoolean("mapred.seqfile.lazydecompress", true);
+      lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true);
     }
     
     /** Close the file. */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java Fri Sep  8 14:19:41 2006
@@ -173,18 +173,29 @@
    * @exception CharacterCodingException if the array contains invalid UTF8 code  
    */
   public void set(byte[] utf8) throws CharacterCodingException {
-    validateUTF8(utf8);
-    set(utf8, utf8.length);
+    set(utf8, 0, utf8.length);
   }
   
   /** copy a text. */
   public void set(Text other) {
-    set(other.bytes, other.length);
+    try {
+      set(other.bytes, 0, other.length);
+    } catch (CharacterCodingException e) {
+      throw new RuntimeException("bad Text UTF8 encoding", e);
+    }
   }
 
-  private void set(byte[] utf8, int len ) {
+  /**
+   * Set the Text to range of bytes
+   * @param utf8 the data to copy from
+   * @param start the first position of the new string
+   * @param len the number of bytes of the new string
+   */
+  public void set(byte[] utf8, int start, int len 
+                  ) throws CharacterCodingException{
+    validateUTF8(utf8, start, len);
     setCapacity(len);
-    System.arraycopy(utf8, 0, bytes, 0, len);
+    System.arraycopy(utf8, start, bytes, 0, len);
     this.length = len;
   }
 
@@ -416,10 +427,17 @@
    * @exception MalformedInputException if the byte array contains invalid utf-8
    */
   public static void validateUTF8(byte[] utf8) throws MalformedInputException {
-     validateUTF(utf8, 0, utf8.length);     
+     validateUTF8(utf8, 0, utf8.length);     
   }
   
-  public static void validateUTF(byte[] utf8, int start, int len)
+  /**
+   * Check to see if a byte array is valid utf-8
+   * @param utf8 the array of bytes
+   * @param start the offset of the first byte in the array
+   * @param len the length of the byte sequence
+   * @throws MalformedInputException if the byte array contains invalid bytes
+   */
+  public static void validateUTF8(byte[] utf8, int start, int len)
     throws MalformedInputException {
     int count = start;
     int leadByte = 0;

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java?view=auto&rev=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java Fri Sep  8 14:19:41 2006
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A factory that will find the correct codec for a given filename.
+ * @author Owen O'Malley
+ */
+public class CompressionCodecFactory {
+
+  public static final Log LOG =
+    LogFactory.getLog(CompressionCodecFactory.class.getName());
+
+  /**
+   * A map from the reversed filename suffixes to the codecs.
+   * This is probably overkill, because the maps should be small, but it 
+   * automatically supports finding the longest matching suffix. 
+   */
+  private SortedMap<String, CompressionCodec> codecs = null;
+  
+  private void addCodec(CompressionCodec codec) {
+    String suffix = codec.getDefaultExtension();
+    codecs.put(new StringBuffer(suffix).reverse().toString(), codec);
+  }
+  
+  /**
+   * Print the extension map out as a string.
+   */
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    Iterator<Map.Entry<String, CompressionCodec>> itr = 
+      codecs.entrySet().iterator();
+    buf.append("{ ");
+    if (itr.hasNext()) {
+      Map.Entry<String, CompressionCodec> entry = itr.next();
+      buf.append(entry.getKey());
+      buf.append(": ");
+      buf.append(entry.getValue().getClass().getName());
+      while (itr.hasNext()) {
+        entry = itr.next();
+        buf.append(", ");
+        buf.append(entry.getKey());
+        buf.append(": ");
+        buf.append(entry.getValue().getClass().getName());
+      }
+    }
+    buf.append(" }");
+    return buf.toString();
+  }
+
+  /**
+   * Get the list of codecs listed in the configuration
+   * @param conf the configuration to look in
+   * @return a list of the Configuration classes or null if the attribute
+   *         was not set
+   */
+  public static List<Class> getCodecClasses(Configuration conf) {
+    String codecsString = conf.get("io.compression.codecs");
+    if (codecsString != null) {
+      List<Class> result = new ArrayList<Class>();
+      StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
+      while (codecSplit.hasMoreElements()) {
+        String codecSubstring = codecSplit.nextToken();
+        if (codecSubstring.length() != 0) {
+          try {
+            Class cls = conf.getClassByName(codecSubstring);
+            if (!CompressionCodec.class.isAssignableFrom(cls)) {
+              throw new IllegalArgumentException("Class " + codecSubstring +
+                                                 " is not a CompressionCodec");
+            }
+            result.add(cls);
+          } catch (ClassNotFoundException ex) {
+            throw new IllegalArgumentException("Compression codec " + 
+                                               codecSubstring + " not found.",
+                                               ex);
+          }
+        }
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+  
+  /**
+   * Sets a list of codec classes in the configuration.
+   * @param conf the configuration to modify
+   * @param classes the list of classes to set
+   */
+  public static void setCodecClasses(Configuration conf,
+                                     List<Class> classes) {
+    StringBuffer buf = new StringBuffer();
+    Iterator<Class> itr = classes.iterator();
+    if (itr.hasNext()) {
+      Class cls = itr.next();
+      buf.append(cls.getName());
+      while(itr.hasNext()) {
+        buf.append(',');
+        buf.append(itr.next().getName());
+      }
+    }
+    conf.set("io.compression.codecs",buf.toString());   
+  }
+  
+  /**
+   * Find the codecs specified in the config value io.compression.codecs 
+   * and register them. Defaults to gzip and zip.
+   */
+  public CompressionCodecFactory(Configuration conf) {
+    codecs = new TreeMap<String, CompressionCodec>();
+    List<Class> codecClasses = getCodecClasses(conf);
+    if (codecClasses == null) {
+      addCodec(new GzipCodec());
+      addCodec(new DefaultCodec());      
+    } else {
+      Iterator<Class> itr = codecClasses.iterator();
+      while (itr.hasNext()) {
+        CompressionCodec codec = 
+          (CompressionCodec) ReflectionUtils.newInstance(itr.next(), conf);
+        addCodec(codec);     
+      }
+    }
+  }
+  
+  /**
+   * Find the relevant compression codec for the given file based on its
+   * filename suffix.
+   * @param file the filename to check
+   * @return the codec object
+   */
+  public CompressionCodec getCodec(Path file) {
+    CompressionCodec result = null;
+    if (codecs != null) {
+      String filename = file.getName();
+      String reversedFilename = new StringBuffer(filename).reverse().toString();
+      SortedMap<String, CompressionCodec> subMap = 
+        codecs.headMap(reversedFilename);
+      if (!subMap.isEmpty()) {
+        String potentialSuffix = subMap.lastKey();
+        if (reversedFilename.startsWith(potentialSuffix)) {
+          result = codecs.get(potentialSuffix);
+        }
+      }
+    }
+    return result;
+  }
+  
+  /**
+   * Removes a suffix from a filename, if it has it.
+   * @param filename the filename to strip
+   * @param suffix the suffix to remove
+   * @return the shortened filename
+   */
+  public static String removeSuffix(String filename, String suffix) {
+    if (filename.endsWith(suffix)) {
+      return filename.substring(0, filename.length() - suffix.length());
+    }
+    return filename;
+  }
+  
+  /**
+   * A little test program.
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new org.apache.hadoop.mapred.JobConf();
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+    boolean encode = false;
+    for(int i=0; i < args.length; ++i) {
+      if ("-in".equals(args[i])) {
+        encode = true;
+      } else if ("-out".equals(args[i])) {
+        encode = false;
+      } else {
+        CompressionCodec codec = factory.getCodec(new Path(args[i]));
+        if (codec == null) {
+          System.out.println("Codec for " + args[i] + " not found.");
+        } else { 
+          if (encode) {
+            CompressionOutputStream out = 
+              codec.createOutputStream(new java.io.FileOutputStream(args[i]));
+            byte[] buffer = new byte[100];
+            String inFilename = removeSuffix(args[i], 
+                                             codec.getDefaultExtension());
+            java.io.InputStream in = new java.io.FileInputStream(inFilename);
+            int len = in.read(buffer);
+            while (len > 0) {
+              out.write(buffer, 0, len);
+              len = in.read(buffer);
+            }
+            in.close();
+            out.close();
+          } else {
+            CompressionInputStream in = 
+              codec.createInputStream(new java.io.FileInputStream(args[i]));
+            byte[] buffer = new byte[100];
+            int len = in.read(buffer);
+            while (len > 0) {
+              System.out.write(buffer, 0, len);
+              len = in.read(buffer);
+            }
+            in.close();
+          }
+        }
+      }
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=auto&rev=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Fri Sep  8 14:19:41 2006
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.*;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+/**
+ * This class creates gzip compressors/decompressors. 
+ * @author Owen O'Malley
+ */
+public class GzipCodec extends DefaultCodec {
+  
+  /**
+   * A bridge that wraps around a DeflaterOutputStream to make it 
+   * a CompressionOutputStream.
+   * @author Owen O'Malley
+   */
+  protected static class GzipOutputStream extends DefaultCompressionOutputStream {
+    private static class ResetableGZIPOutputStream extends GZIPOutputStream {
+      public ResetableGZIPOutputStream(OutputStream out) throws IOException {
+        super(out);
+      }
+      
+      public void resetState() throws IOException {
+        def.reset();
+      }
+    }
+
+    public GzipOutputStream(OutputStream out) throws IOException {
+      super(new ResetableGZIPOutputStream(out));
+    }
+    
+    /**
+     * Allow children types to put a different type in here.
+     * @param out the Deflater stream to use
+     */
+    protected GzipOutputStream(DefaultCompressionOutputStream out) {
+      super(out);
+    }
+    
+
+    public void resetState() throws IOException {
+      ((ResetableGZIPOutputStream) out).resetState();
+    }
+
+  }
+  
+  protected static class GzipInputStream extends DefaultCompressionInputStream {
+    
+    private static class ResetableGZIPInputStream extends GZIPInputStream {
+      public ResetableGZIPInputStream(InputStream in) throws IOException {
+        super(in);
+      }
+      
+      public void resetState() throws IOException {
+        inf.reset();
+      }
+    }
+    
+    public GzipInputStream(InputStream in) throws IOException {
+      super(new ResetableGZIPInputStream(in));
+    }
+    
+    /**
+     * Allow subclasses to directly set the inflater stream.
+     */
+    protected GzipInputStream(DefaultCompressionInputStream in) {
+      super(in);
+    }
+  }
+  
+  /**
+   * Create a stream compressor that will write to the given output stream.
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to
+   */
+  public CompressionOutputStream createOutputStream(OutputStream out) 
+  throws IOException {
+    return new GzipOutputStream(out);
+  }
+  
+  /**
+   * Create a stream decompressor that will read from the given input stream.
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   */
+  public CompressionInputStream createInputStream(InputStream in) 
+  throws IOException {
+    return new GzipInputStream(in);
+  }
+  
+  /**
+   * Get the default filename extension for this kind of compression.
+   * @return the extension including the '.'
+   */
+  public String getDefaultExtension() {
+    return ".gz";
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Fri Sep  8 14:19:41 2006
@@ -40,6 +40,17 @@
     this.minSplitSize = minSplitSize;
   }
 
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * @param fs the file system that the file is on
+   * @param filename the file name to check
+   * @return is this file splitable?
+   */
+  protected boolean isSplitable(FileSystem fs, Path filename) {
+    return true;
+  }
+  
   public abstract RecordReader getRecordReader(FileSystem fs,
                                                FileSplit split,
                                                JobConf job,
@@ -117,15 +128,12 @@
 
     Path[] files = listPaths(fs, job);
 
+    long totalSize = 0;                           // compute total size
     for (int i = 0; i < files.length; i++) {      // check we have valid files
       Path file = files[i];
       if (fs.isDirectory(file) || !fs.exists(file)) {
         throw new IOException("Not a file: "+files[i]);
       }
-    }
-
-    long totalSize = 0;                           // compute total size
-    for (int i = 0; i < files.length; i++) {
       totalSize += fs.getLength(files[i]);
     }
 
@@ -138,19 +146,24 @@
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
       long length = fs.getLength(file);
-      long blockSize = fs.getBlockSize(file);
-      long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
-      long bytesRemaining = length;
-      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-        splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
-        bytesRemaining -= splitSize;
-      }
-      
-      if (bytesRemaining != 0) {
-        splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+      if (isSplitable(fs, file)) {
+        long blockSize = fs.getBlockSize(file);
+        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+          splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
+          bytesRemaining -= splitSize;
+        }
+        
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+        }
+      } else {
+        if (length != 0) {
+          splits.add(new FileSplit(file, 0, length));
+        }
       }
-      //LOG.info( "Generating splits for " + i + "th file: " + file.getName() );
     }
     //LOG.info( "Total # of splits: " + splits.size() );
     return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Sep  8 14:19:41 2006
@@ -37,6 +37,7 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.compress.CompressionCodec;
 
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -332,6 +333,38 @@
    */
   public boolean getCompressMapOutput() {
     return getBoolean("mapred.compress.map.output", false);
+  }
+
+  /**
+   * Set the given class as the  compression codec for the map outputs.
+   * @param codecClass the CompressionCodec class that will compress the 
+   *                   map outputs
+   */
+  public void setMapOutputCompressorClass(Class codecClass) {
+    setCompressMapOutput(true);
+    setClass("mapred.output.compression.codec", codecClass, 
+             CompressionCodec.class);
+  }
+  
+  /**
+   * Get the codec for compressing the map outputs
+   * @param defaultValue the value to return if it is not set
+   * @return the CompressionCodec class that should be used to compress the 
+   *   map outputs
+   * @throws IllegalArgumentException if the class was specified, but not found
+   */
+  public Class getMapOutputCompressorClass(Class defaultValue) {
+    String name = get("mapred.output.compression.codec");
+    if (name == null) {
+      return defaultValue;
+    } else {
+      try {
+        return getClassByName(name);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name + 
+                                           " was not found.", e);
+      }
+    }
   }
   
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Sep  8 14:19:41 2006
@@ -20,8 +20,11 @@
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics.MetricsRecord;
 
 import org.apache.commons.logging.*;
@@ -123,21 +126,27 @@
     final int partitions = job.getNumReduceTasks();
     final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
     try {
+      Reporter reporter = getReporter(umbilical, getProgress());
       FileSystem localFs = FileSystem.getNamed("local", job);
-      /** TODO: Figure out a way to deprecate 'mapred.compress.map.output' */
-      boolean compressTemps = job.getBoolean("mapred.compress.map.output", 
-                                             false);
+      CompressionCodec codec = null;
+      CompressionType compressionType = CompressionType.NONE;
+      if (job.getCompressMapOutput()) {
+        // find the kind of compression to do, defaulting to record
+        compressionType = SequenceFile.getCompressionType(job);
+
+        // find the right codec
+        Class codecClass = 
+          job.getMapOutputCompressorClass(DefaultCodec.class);
+        codec = (CompressionCodec) 
+                   ReflectionUtils.newInstance(codecClass, job);
+      }
       for (int i = 0; i < partitions; i++) {
+        Path filename = mapOutputFile.getOutputFile(getTaskId(), i);
         outs[i] =
-          SequenceFile.createWriter(localFs, job,
-                                  this.mapOutputFile.getOutputFile(getTaskId(), i),
-                                  job.getMapOutputKeyClass(),
-                                  job.getMapOutputValueClass(),
-                                  compressTemps ? CompressionType.RECORD : 
-                                    CompressionType.valueOf(
-                                        job.get("mapred.seqfile.compression.type", 
-                                            "NONE"))
-                                  );
+          SequenceFile.createWriter(localFs, job, filename,
+                                    job.getMapOutputKeyClass(),
+                                    job.getMapOutputValueClass(),
+                                    compressionType, codec, reporter);
         LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
       }
 
@@ -157,7 +166,6 @@
         };
 
       OutputCollector collector = partCollector;
-      Reporter reporter = getReporter(umbilical, getProgress());
 
       boolean combining = job.getCombinerClass() != null;
       if (combining) {                            // add combining collector

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Fri Sep  8 14:19:41 2006
@@ -20,10 +20,63 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
 
 /** A base class for {@link OutputFormat}. */
 public abstract class OutputFormatBase implements OutputFormat {
+
+  /**
+   * Set whether the output of the reduce is compressed
+   * @param val the new setting
+   */
+  public static void setCompressOutput(JobConf conf, boolean val) {
+    conf.setBoolean("mapred.output.compress", val);
+  }
+  
+  /**
+   * Is the reduce output compressed?
+   * @return true, if the output should be compressed
+   */
+  public static boolean getCompressOutput(JobConf conf) {
+    return conf.getBoolean("mapred.output.compress", false);
+  }
+  
+  /**
+   * Set the given class as the output compression codec.
+   * @param conf the JobConf to modify
+   * @param codecClass the CompressionCodec class that will compress the 
+   *                   reduce outputs
+   */
+  public static void setOutputCompressorClass(JobConf conf, Class codecClass) {
+    setCompressOutput(conf, true);
+    conf.setClass("mapred.output.compression.codec", codecClass, 
+                  CompressionCodec.class);
+  }
+  
+  /**
+   * Get the codec for compressing the reduce outputs
+   * @param conf the Configuration to look in
+   * @param defaultValue the value to return if it is not set
+   * @return the CompressionCodec class that should be used to compress the 
+   *   reduce outputs
+   * @throws IllegalArgumentException if the class was specified, but not found
+   */
+  public static Class getOutputCompressorClass(JobConf conf, 
+                                               Class defaultValue) {
+    String name = conf.get("mapred.output.compression.codec");
+    if (name == null) {
+      return defaultValue;
+    } else {
+      try {
+        return conf.getClassByName(name);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name + 
+                                           " was not found.", e);
+      }
+    }
+  }
+  
   public abstract RecordWriter getRecordWriter(FileSystem fs,
                                                JobConf job, String name,
                                                Progressable progress)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Sep  8 14:19:41 2006
@@ -27,8 +27,10 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.*;
 
 /** An {@link OutputFormat} that writes {@link SequenceFile}s. */
 public class SequenceFileOutputFormat extends OutputFormatBase {
@@ -38,18 +40,23 @@
                                       throws IOException {
 
     Path file = new Path(job.getOutputPath(), name);
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(job)) {
+      // find the kind of compression to do
+      compressionType = SequenceFile.getCompressionType(job);
 
-    /** TODO: Figure out a way to deprecate 'mapred.output.compress' */
+      // find the right codec
+      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+      codec = (CompressionCodec) 
+                 ReflectionUtils.newInstance(codecClass, job);
+    }
     final SequenceFile.Writer out = 
       SequenceFile.createWriter(fs, job, file,
                               job.getOutputKeyClass(),
                               job.getOutputValueClass(),
-                              job.getBoolean("mapred.output.compress", false) ? 
-                                  CompressionType.RECORD : 
-                                  CompressionType.valueOf(
-                                    job.get("mapred.seqfile.compression.type", 
-                                        "NONE")
-                                  ),
+                              compressionType,
+                              codec,
                               progress);
 
     return new RecordWriter() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Fri Sep  8 14:19:41 2006
@@ -16,106 +16,148 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
+import java.io.*;
 
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
 
 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
  * Either linefeed or carriage-return are used to signal end of line.  Keys are
  * the position in the file, and values are the line of text.. */
-public class TextInputFormat extends InputFormatBase {
+public class TextInputFormat extends InputFormatBase implements JobConfigurable {
+
+  private CompressionCodecFactory compressionCodecs = null;
+  
+  public void configure(JobConf conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+  
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    return compressionCodecs.getCodec(file) == null;
+  }
+  
+  protected static class LineRecordReader implements RecordReader {
+    private long pos;
+    private long end;
+    private BufferedInputStream in;
+    private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+    /**
+     * Provide a bridge to get the bytes from the ByteArrayOutputStream
+     * without creating a new byte array.
+     */
+    private static class TextStuffer extends OutputStream {
+      public Text target;
+      public void write(int b) {
+        throw new UnsupportedOperationException("write(byte) not supported");
+      }
+      public void write(byte[] data, int offset, int len) throws IOException {
+        target.set(data, offset, len);
+      }      
+    }
+    private TextStuffer bridge = new TextStuffer();
+
+    public LineRecordReader(InputStream in, long offset, long endOffset) {
+      this.in = new BufferedInputStream(in);
+      this.pos = offset;
+      this.end = endOffset;
+    }
+    
+    public WritableComparable createKey() {
+      return new LongWritable();
+    }
+    
+    public Writable createValue() {
+      return new Text();
+    }
+    
+    /** Read a line. */
+    public synchronized boolean next(Writable key, Writable value)
+      throws IOException {
+      if (pos >= end)
+        return false;
+
+      ((LongWritable)key).set(pos);           // key is position
+      buffer.reset();
+      long bytesRead = readLine(in, buffer);
+      if (bytesRead == 0) {
+        return false;
+      }
+      pos += bytesRead;
+      bridge.target = (Text) value;
+      buffer.writeTo(bridge);
+      return true;
+    }
+    
+    public  synchronized long getPos() throws IOException {
+      return pos;
+    }
+
+    public synchronized void close() throws IOException { 
+      in.close(); 
+    }  
 
+  }
+  
   public RecordReader getRecordReader(FileSystem fs, FileSplit split,
                                       JobConf job, Reporter reporter)
     throws IOException {
 
     reporter.setStatus(split.toString());
 
-    final long start = split.getStart();
-    final long end = start + split.getLength();
+    long start = split.getStart();
+    long end = start + split.getLength();
+    final Path file = split.getPath();
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
-    final FSDataInputStream in = fs.open(split.getPath());
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    InputStream in = fileIn;
     
-    if (start != 0) {
-      in.seek(start-1);
-      while (in.getPos() < end) {    // scan to the next newline in the file
-        char c = (char)in.read();
-        if (c == '\n')
-          break;
-          
-        if (c == '\r') {       
-          long curPos = in.getPos();
-          char nextC = (char)in.read();
-          if (nextC != '\n') {
-            in.seek(curPos);
-          }
-
-          break;
-        }
-      }
+    if (codec != null) {
+      in = codec.createInputStream(fileIn);
+      end = Long.MAX_VALUE;
+    } else if (start != 0) {
+      fileIn.seek(start-1);
+      readLine(fileIn, null);
+      start = fileIn.getPos();
     }
-
-    return new RecordReader() {
-      
-        public WritableComparable createKey() {
-          return new LongWritable();
-        }
-        
-        public Writable createValue() {
-          return new Text();
-        }
-        
-        /** Read a line. */
-        public synchronized boolean next(Writable key, Writable value)
-          throws IOException {
-          long pos = in.getPos();
-          if (pos >= end)
-            return false;
-
-          ((LongWritable)key).set(pos);           // key is position
-          ((Text)value).set(readLine(in));        // value is line
-          return true;
-        }
-        
-        public  synchronized long getPos() throws IOException {
-          return in.getPos();
-        }
-
-        public synchronized void close() throws IOException { in.close(); }
-
-      };
+    
+    return new LineRecordReader(in, start, end);
   }
 
-  private static String readLine(FSDataInputStream in) throws IOException {
-    StringBuffer buffer = new StringBuffer();
+  public static long readLine(InputStream in, 
+                              OutputStream out) throws IOException {
+    long bytes = 0;
     while (true) {
 
       int b = in.read();
-      if (b == -1)
+      if (b == -1) {
         break;
-
-      char c = (char)b;              // bug: this assumes eight-bit characters.
-      if (c == '\n')
+      }
+      bytes += 1;
+      
+      byte c = (byte)b;
+      if (c == '\n') {
         break;
-        
-      if (c == '\r') {       
-        long curPos = in.getPos();
-        char nextC = (char)in.read();
+      }
+      
+      if (c == '\r') {
+        in.mark(1);
+        byte nextC = (byte)in.read();
         if (nextC != '\n') {
-          in.seek(curPos);
+          in.reset();
+        } else {
+          bytes += 1;
         }
-
         break;
       }
 
-      buffer.append(c);
+      if (out != null) {
+        out.write(c);
+      }
     }
-    
-    return buffer.toString();
+    return bytes;
   }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Fri Sep  8 14:19:41 2006
@@ -16,6 +16,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -24,30 +25,51 @@
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.*;
 
 /** An {@link OutputFormat} that writes plain text files. */
 public class TextOutputFormat extends OutputFormatBase {
 
+  protected static class LineRecordWriter implements RecordWriter {
+    private DataOutputStream out;
+    
+    public LineRecordWriter(DataOutputStream out) {
+      this.out = out;
+    }
+    
+    public synchronized void write(WritableComparable key, Writable value)
+    throws IOException {
+      out.write(key.toString().getBytes("UTF-8"));
+      out.writeByte('\t');
+      out.write(value.toString().getBytes("UTF-8"));
+      out.writeByte('\n');
+    }
+    public synchronized void close(Reporter reporter) throws IOException {
+      out.close();
+    }   
+  }
+  
   public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
                                       String name, Progressable progress) throws IOException {
 
-    Path file = new Path(job.getOutputPath(), name);
-
-    final FSDataOutputStream out = fs.create(file, progress);
-
-    return new RecordWriter() {
-        public synchronized void write(WritableComparable key, Writable value)
-          throws IOException {
-          out.write(key.toString().getBytes("UTF-8"));
-          out.writeByte('\t');
-          out.write(value.toString().getBytes("UTF-8"));
-          out.writeByte('\n');
-        }
-        public synchronized void close(Reporter reporter) throws IOException {
-          out.close();
-        }
-      };
+    Path dir = job.getOutputPath();
+    boolean isCompressed = getCompressOutput(job);
+    if (!isCompressed) {
+      FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+      return new LineRecordWriter(fileOut);
+    } else {
+      Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
+      // create the named codec
+      CompressionCodec codec = (CompressionCodec)
+                               ReflectionUtils.newInstance(codecClass, job);
+      // build the filename including the extension
+      Path filename = new Path(dir, name + codec.getDefaultExtension());
+      FSDataOutputStream fileOut = fs.create(filename, progress);
+      return new LineRecordWriter(new DataOutputStream
+                                  (codec.createOutputStream(fileOut)));
+    }
   }      
 }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java Fri Sep  8 14:19:41 2006
@@ -198,7 +198,7 @@
       Text text = new Text("abcd\u20acbdcd\u20ac");
       byte [] utf8 = text.getBytes();
       int length = text.getLength();
-      Text.validateUTF(utf8, 0, length);
+      Text.validateUTF8(utf8, 0, length);
   }
 
   public void testTextText() throws CharacterCodingException {

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?view=auto&rev=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Fri Sep  8 14:19:41 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.*;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestCodecFactory extends TestCase {
+
+  private static class BaseCodec implements CompressionCodec {
+    public CompressionOutputStream createOutputStream(OutputStream out) {
+      return null;
+    }
+    
+    public CompressionInputStream createInputStream(InputStream in) {
+      return null;
+    }
+    
+    public String getDefaultExtension() {
+      return ".base";
+    }
+  }
+  
+  private static class BarCodec extends BaseCodec {
+    public String getDefaultExtension() {
+      return "bar";
+    }
+  }
+  
+  private static class FooBarCodec extends BaseCodec {
+    public String getDefaultExtension() {
+      return ".foo.bar";
+    }
+  }
+  
+  private static class FooCodec extends BaseCodec {
+    public String getDefaultExtension() {
+      return ".foo";
+    }
+  }
+  
+  /**
+   * Returns a factory for a given set of codecs
+   * @param classes the codec classes to include
+   * @return a new factory
+   */
+  private static CompressionCodecFactory setClasses(Class[] classes) {
+    Configuration conf = new Configuration();
+    CompressionCodecFactory.setCodecClasses(conf, Arrays.asList(classes));
+    return new CompressionCodecFactory(conf);
+  }
+  
+  private static void checkCodec(String msg, 
+                                 Class expected, CompressionCodec actual) {
+    assertEquals(msg + " unexpected codec found",
+                 expected.getName(),
+                 actual.getClass().getName());
+  }
+  
+  public static void testFinding() {
+    CompressionCodecFactory factory = 
+      new CompressionCodecFactory(new Configuration());
+    CompressionCodec codec = factory.getCodec(new Path("/tmp/foo.bar"));
+    assertEquals("default factory foo codec", null, codec);
+    codec = factory.getCodec(new Path("/tmp/foo.gz"));
+    checkCodec("default factory for .gz", GzipCodec.class, codec);
+    factory = setClasses(new Class[0]);
+    codec = factory.getCodec(new Path("/tmp/foo.bar"));
+    assertEquals("empty codec bar codec", null, codec);
+    codec = factory.getCodec(new Path("/tmp/foo.gz"));
+    assertEquals("empty codec gz codec", null, codec);
+    factory = setClasses(new Class[]{BarCodec.class, FooCodec.class, 
+                                     FooBarCodec.class});
+    codec = factory.getCodec(new Path("/tmp/.foo.bar.gz"));
+    assertEquals("full factory gz codec", null, codec);
+    codec = factory.getCodec(new Path("/tmp/foo.bar"));
+    checkCodec("full factory bar codec", BarCodec.class, codec);
+    codec = factory.getCodec(new Path("/tmp/foo/baz.foo.bar"));
+    checkCodec("full factory foo bar codec", FooBarCodec.class, codec);
+    codec = factory.getCodec(new Path("/tmp/foo.foo"));
+    checkCodec("full factory foo codec", FooCodec.class, codec);
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Sep  8 14:19:41 2006
@@ -17,7 +17,6 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.lib.*;
 import junit.framework.TestCase;
 import java.io.*;
@@ -83,7 +82,6 @@
      * as many times as we were instructed.
      */
     static class RandomGenMapper implements Mapper {
-        Random r = new Random();
         public void configure(JobConf job) {
         }
 
@@ -105,7 +103,6 @@
         }
 
         public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
-            int keyint = ((IntWritable) key).get();
             while (it.hasNext()) {
                 int val = ((IntWritable) it.next()).get();
                 out.collect(new Text("" + val), new Text(""));
@@ -136,7 +133,6 @@
         }
 
         public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-            long pos = ((LongWritable) key).get();
             Text str = (Text) val;
 
             out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
@@ -203,7 +199,6 @@
     private static int range = 10;
     private static int counts = 100;
     private static Random r = new Random();
-    private static Configuration conf = new Configuration();
 
     /**
        public TestMapRed(int range, int counts, Configuration conf) throws IOException {
@@ -252,19 +247,14 @@
     private static class MyReduce extends IdentityReducer {
       private JobConf conf;
       private boolean compressInput;
-      private boolean compressOutput;
       private String taskId;
-      private int partition;
       private boolean first = true;
       
       public void configure(JobConf conf) {
         this.conf = conf;
         compressInput = conf.getBoolean("mapred.compress.map.output", 
                                         false);
-        compressOutput = conf.getBoolean("mapred.compress.output",
-                                         false);
         taskId = conf.get("mapred.task.id");
-        partition = conf.getInt("mapred.task.partition", -1);
       }
       
       public void reduce(WritableComparable key, Iterator values,
@@ -295,6 +285,7 @@
       Path inDir = new Path(testdir, "in");
       Path outDir = new Path(testdir, "out");
       FileSystem fs = FileSystem.get(conf);
+      fs.delete(testdir);
       conf.setInputPath(inDir);
       conf.setOutputPath(outDir);
       conf.setMapperClass(MyMap.class);
@@ -306,10 +297,10 @@
         conf.setCombinerClass(IdentityReducer.class);
       }
       if (compressMapOutput) {
-        conf.setBoolean("mapred.compress.map.output", true);
+        conf.setCompressMapOutput(true);
       }
       if (compressReduceOutput) {
-        conf.setBoolean("mapred.output.compress", true);
+        SequenceFileOutputFormat.setCompressOutput(conf, true);
       }
       try {
         fs.mkdirs(testdir);
@@ -354,6 +345,7 @@
         //
         // Generate distribution of ints.  This is the answer key.
         //
+        JobConf conf = new JobConf();
         int countsToGo = counts;
         int dist[] = new int[range];
         for (int i = 0; i < range; i++) {
@@ -376,7 +368,10 @@
         fs.mkdirs(randomIns);
 
         Path answerkey = new Path(randomIns, "answer.key");
-        SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey, IntWritable.class, IntWritable.class);
+        SequenceFile.Writer out = 
+          SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
+                                    IntWritable.class, 
+                                    SequenceFile.CompressionType.NONE);
         try {
             for (int i = 0; i < range; i++) {
                 out.append(new IntWritable(i), new IntWritable(dist[i]));
@@ -409,8 +404,6 @@
 
         JobConf genJob = new JobConf(conf);
         genJob.setInputPath(randomIns);
-        genJob.setInputKeyClass(IntWritable.class);
-        genJob.setInputValueClass(IntWritable.class);
         genJob.setInputFormat(SequenceFileInputFormat.class);
         genJob.setMapperClass(RandomGenMapper.class);
 
@@ -479,8 +472,6 @@
         fs.delete(finalOuts);
         JobConf mergeJob = new JobConf(conf);
         mergeJob.setInputPath(intermediateOuts);
-        mergeJob.setInputKeyClass(IntWritable.class);
-        mergeJob.setInputValueClass(IntWritable.class);
         mergeJob.setInputFormat(SequenceFileInputFormat.class);
         mergeJob.setMapperClass(MergeMapper.class);
         
@@ -564,8 +555,8 @@
         }
 
         int i = 0;
-        int range = Integer.parseInt(argv[i++]);
-        int counts = Integer.parseInt(argv[i++]);
-	launch();
+        range = Integer.parseInt(argv[i++]);
+        counts = Integer.parseInt(argv[i++]);
+	      launch();
     }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Sep  8 14:19:41 2006
@@ -20,20 +20,33 @@
 import java.util.*;
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.compress.*;
 
 public class TestTextInputFormat extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestTextInputFormat.class.getName());
 
   private static int MAX_LENGTH = 10000;
-  private static Configuration conf = new Configuration();
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      localFs = FileSystem.getNamed("local", defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestTextInputFormat");
   
   public void testFormat() throws Exception {
-    JobConf job = new JobConf(conf);
-    FileSystem fs = FileSystem.getNamed("local", conf);
-    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
-    Path file = new Path(dir, "test.txt");
+    JobConf job = new JobConf();
+    Path file = new Path(workDir, "test.txt");
 
     Reporter reporter = new Reporter() {
         public void setStatus(String status) throws IOException {}
@@ -41,20 +54,20 @@
       };
     
     int seed = new Random().nextInt();
-    //LOG.info("seed = "+seed);
+    LOG.info("seed = "+seed);
     Random random = new Random(seed);
 
-    fs.delete(dir);
-    job.setInputPath(dir);
+    localFs.delete(workDir);
+    job.setInputPath(workDir);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
          length+= random.nextInt(MAX_LENGTH/10)+1) {
 
-      //LOG.info("creating; entries = " + length);
+      LOG.debug("creating; entries = " + length);
 
       // create a file with length entries
-      Writer writer = new OutputStreamWriter(fs.create(file));
+      Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
         for (int i = 0; i < length; i++) {
           writer.write(Integer.toString(i));
@@ -65,33 +78,38 @@
       }
 
       // try splitting the file in a variety of sizes
-      InputFormat format = new TextInputFormat();
+      TextInputFormat format = new TextInputFormat();
+      format.configure(job);
       LongWritable key = new LongWritable();
       Text value = new Text();
       for (int i = 0; i < 3; i++) {
         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
-        //LOG.info("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(fs, job, numSplits);
-        //LOG.info("splitting: got =        " + splits.length);
+        LOG.debug("splitting: requesting = " + numSplits);
+        FileSplit[] splits = format.getSplits(localFs, job, numSplits);
+        LOG.debug("splitting: got =        " + splits.length);
 
         // check each split
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
+          LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" +
+                   splits[j].getLength());
           RecordReader reader =
-            format.getRecordReader(fs, splits[j], job, reporter);
+            format.getRecordReader(localFs, splits[j], job, reporter);
           try {
             int count = 0;
             while (reader.next(key, value)) {
               int v = Integer.parseInt(value.toString());
-              //             if (bits.get(v)) {
-              //               LOG.info("splits["+j+"]="+splits[j]+" : " + v);
-              //               LOG.info("@"+reader.getPos());
-              //             }
+              LOG.debug("read " + v);
+              if (bits.get(v)) {
+                LOG.warn("conflict with " + v + 
+                         " in split " + j +
+                         " at position "+reader.getPos());
+              }
               assertFalse("Key in multiple partitions.", bits.get(v));
               bits.set(v);
               count++;
             }
-            //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+            LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
           } finally {
             reader.close();
           }
@@ -102,6 +120,110 @@
     }
   }
 
+  private InputStream makeStream(String str) throws IOException {
+    Text text = new Text(str);
+    return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());
+  }
+  
+  public void testUTF8() throws Exception {
+    InputStream in = makeStream("abcd\u20acbdcd\u20ac");
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    TextInputFormat.readLine(in, out);
+    Text line = new Text();
+    line.set(out.toByteArray());
+    assertEquals("readLine changed utf8 characters", 
+                 "abcd\u20acbdcd\u20ac", line.toString());
+    in = makeStream("abc\u200axyz");
+    out.reset();
+    TextInputFormat.readLine(in, out);
+    line.set(out.toByteArray());
+    assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+  }
+
+  public void testNewLines() throws Exception {
+    InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    TextInputFormat.readLine(in, out);
+    assertEquals("line1 length", 1, out.size());
+    out.reset();
+    TextInputFormat.readLine(in, out);
+    assertEquals("line2 length", 2, out.size());
+    out.reset();
+    TextInputFormat.readLine(in, out);
+    assertEquals("line3 length", 0, out.size());
+    out.reset();
+    TextInputFormat.readLine(in, out);
+    assertEquals("line4 length", 3, out.size());
+    out.reset();
+    TextInputFormat.readLine(in, out);
+    assertEquals("line5 length", 4, out.size());
+    out.reset();
+    TextInputFormat.readLine(in, out);
+    assertEquals("line5 length", 5, out.size());
+    assertEquals("end of file", 0, TextInputFormat.readLine(in, out));
+  }
+  
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+  
+  private static class VoidReporter implements Reporter {
+    public void progress() {}
+    public void setStatus(String msg) {}
+  }
+  private static final Reporter voidReporter = new VoidReporter();
+  
+  private static List<Text> readSplit(InputFormat format, 
+                                      FileSplit split, 
+                                      JobConf job) throws IOException {
+    List<Text> result = new ArrayList<Text>();
+    RecordReader reader = format.getRecordReader(localFs, split, job,
+                                                 voidReporter);
+    LongWritable key = (LongWritable) reader.createKey();
+    Text value = (Text) reader.createValue();
+    while (reader.next(key, value)) {
+      result.add(value);
+      value = (Text) reader.createValue();
+    }
+    return result;
+  }
+  
+  /**
+   * Test using the gzip codec for reading
+   */
+  public static void testGzip() throws IOException {
+    CompressionCodec gzip = new GzipCodec();
+    localFs.delete(workDir);
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+              "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+              "this is a test\nof gzip\n");
+    JobConf job = new JobConf();
+    job.setInputPath(workDir);
+    TextInputFormat format = new TextInputFormat();
+    format.configure(job);
+    FileSplit[] splits = format.getSplits(localFs, job, 100);
+    assertEquals("compressed splits == 2", 2, splits.length);
+    List<Text> results = readSplit(format, splits[0], job);
+    assertEquals("splits[0] length", 6, results.size());
+    assertEquals("splits[0][5]", " dog", results.get(5).toString());
+    results = readSplit(format, splits[1], job);
+    assertEquals("splits[1] length", 2, results.size());
+    assertEquals("splits[1][0]", "this is a test", 
+                 results.get(0).toString());    
+    assertEquals("splits[1][1]", "of gzip", 
+                 results.get(1).toString());    
+  }
+  
   public static void main(String[] args) throws Exception {
     new TestTextInputFormat().testFormat();
   }